Начало работы с асинхронными функциями в Python

Spread the love

Если вы совершено не знакомы с асинхронным программированием и хотите разобраться с этим максимально простым способом, это статья для вас. В статье рассказывается то такое синхронные и асинхронные программы, и их отличия.

Оригинальная статья: Doug Farrell  – Getting Started With Async Features in Python

Содержание

Вы слышали об асинхронном программировании на Python? Вам интересно узнать больше об асинхронных функциях Python и о том, как вы можете использовать их в своей работе? Возможно, вы даже пытались написать многопоточные программы и столкнулись с некоторыми проблемами. Если вы хотите понять, как использовать асинхронные функции Python, вы попали по адресу.

В этой статье вы узнаете:

  • Что такое синхронная программа
  • Что такое асинхронная программа
  • Для чего нужны асинхронные программы
  • Как использовать асинхронные функции Python

Все примеры кода в этой статье были протестированы с Python 3.7.2. Нажмите здесь, чтобы загрузить примеры кода, использованные в этой статье.

Понимание асинхронного программирования

Синхронная программа выполняется по одному шагу за раз. Даже с условными переходами, циклами и вызовами функций вы все равно можете думать о коде с точки зрения выполнения одного шага выполнения за раз. Когда каждый шаг завершен, программа переходит к следующему.

Вот два примера программ, которые работают таким образом:

  • Программы пакетной обработки часто создаются как синхронные программы. Вы получаете некоторую информацию, обрабатываете ее и создаете некоторую информацию. Шаги следуют один за другим, пока программа не достигнет желаемого результата. В программе нужно только обратить внимание на этапы и их порядок.
  • Программы командной строки – это небольшие быстрые процессы, которые выполняются в терминале. Эти сценарии используются для создания чего-либо, преобразования одной вещи во что-то другое, создания отчета или, возможно, вывода списка данных. Это может быть выражено как последовательность шагов программы, которые выполняются последовательно, пока программа не будет завершена.

Асинхронная программа ведет себя по-другому. Они все еще выполняются шаг за шагом. Разница в том, что система может не дождаться завершения шага выполнения, прежде чем перейти к следующему.

Это означает, что программа перейдет к следующим шагам выполнения, даже если предыдущий шаг еще не завершен и все еще выполняется в другом месте. Это также означает, что программа знает, что делать после завершения предыдущего шага.

Зачем нужно писать такие программы? Оставшаяся часть этой статьи поможет вам ответить на этот вопрос и даст вам инструменты, необходимые для элегантного решения интересных асинхронных задач.

Создание синхронного веб-сервера

Основная задача веб-сервера более или менее аналогична пакетной обработке. Сервер получает некоторый ввод, обработает его и создаст вывод. Написанный как синхронная программа, может работать как обычный веб-сервер.

Но это также был бы абсолютно ужасный веб-сервер.

Почему? В этом случае одна единица работы (ввод, обработка, вывод) – не единственная цель. Настоящая цель – как можно быстрее выполнить сотни или даже тысячи единиц работы. Это может происходить в течение длительных периодов времени, и несколько рабочих единиц могут даже прибыть все сразу.

Можно ли сделать синхронный веб-сервер лучше? Конечно, вы можете оптимизировать этапы выполнения так, чтобы вся входящая работа выполнялась как можно быстрее. К сожалению, у этого подхода есть ограничения. Результатом может быть веб-сервер, который не отвечает достаточно быстро, не справляется с достаточной работой, или даже тот, который зависает, когда работа накапливается.

Примечание. Существуют и другие ограничения, с которыми вы можете столкнуться, если попытаетесь использовать вышеуказанный подход. К ним относятся скорость сети, скорость ввода-вывода файлов, скорость запросов к базе данных и скорость других подключенных служб, и многие другие. Общим для всех них является то, что все они являются функциями ввода-вывода. Все эти элементы на порядки медленнее, чем скорость процессора.

В синхронной программе, если шаг выполнения запускает запрос к базе данных, то ЦП практически не используется, пока не будет возвращен запрос к базе данных. Для пакетно-ориентированных программ это не является приоритетом большую часть времени. Целью является обработка результатов этой операции ввода-вывода. Часто это может занять больше времени, чем сама операция ввода-вывода. Любые усилия по оптимизации будут сосредоточены на обработке, а не на IO.

Методы асинхронного программирования позволяют вашим программам использовать преимущества относительно медленных процессов ввода-вывода, освобождая ЦП для выполнения другой работы.

Иной взгляд на программирование

Когда вы начнете пытаться понять асинхронное программирование, вы можете увидеть много дискуссий о важности блокирования или написания неблокирующего кода. (Лично я изо всех сил пытался понять эти концепции.)

Что такое неблокирующий код? Что такое код блокировки? Могут ли ответы на эти вопросы помочь вам написать лучший веб-сервер? Если так, как это можно сделать? Давайте разберемся!

Написание асинхронных программ требует, чтобы вы по-другому относились к программированию. Хотя это новое мышление сразу может показаться сложным, но это также интересное упражнение. Это потому, что реальный мир почти полностью асинхронен, как и то, как вы взаимодействуете с ним.

Представьте себе, что вы – родитель, пытающийся сделать несколько вещей одновременно. Вы должны подсчитать последние траты по чековой книжке, постирать белье и присматривать за детьми. Так или иначе, вы можете делать все эти вещи одновременно, даже не задумываясь об этом! Давайте разберемся с этим:

  • Подсчет трат по чековой книжки – синхронная задача. Один шаг следует за другим, пока не будет сделано. Вы делаете всю работу самостоятельно.
  • Однако вы можете оторваться от чековой книжки, чтобы заняться стиркой. Вы выгружаете сушилку, перемещаете одежду из стиральной машины в сушилку и запускаете другую загрузку в стиральную машину.
  • Работа с стиральной машиной и сушилкой является синхронной задачей, но основная часть работы происходит после запуска стиральной машины и сушилки. Как только вы их запустите, вы можете уйти и вернуться к чековой книжки. На этом этапе задачи мойки и сушки стали асинхронными. Стиральная машина и сушилка будут работать независимо до тех пор, пока не сработает зуммер (уведомляя вас о том, что задача требует внимания).
  • Следить за своими детьми – еще одна асинхронная задача. После того, как им дано какое ли задание или они начали играть, они могут делать это самостоятельно большую часть времени. Это меняется, когда кто-то нуждается во внимании, например, когда кто-то проголодался или поранился. Когда один из ваших детей закричит в тревоге, вам нужно срочно отреагировать. Дети – долгосрочное задание с высоким приоритетом. Наблюдение за ними заменяет любые другие задачи, которые вы можете выполнять, например, проверку чековой книжки или стирку.

Эти примеры могут помочь проиллюстрировать понятия блокирующего и неблокирующего кода. Давайте подумаем об этом в терминах программирования. В этом примере вы похожи на процессор. Пока вы перемещаете белье, вы (процессор) заняты и не можете выполнять другую работу, например, подсчет чековой книжки. Но это нормально, потому что задача относительно быстрая.

С другой стороны, запуск стиральной машины и сушилки не мешает вам выполнять другие задачи. Это асинхронная функция, потому что вам не нужно ждать ее завершения. Как только это началось, вы можете вернуться к чему-то другому. Это называется переключением контекста (context switch): изменился контекст того, что вы делаете, и зуммер машины сообщит вам о будущем, когда задача стирки будет завершена.

Как человек, вы, естественно, можете манипулировать несколькими вещами одновременно, часто не задумываясь об этом. Как разработчик, уловка состоит в том, как преобразовать такое поведение в код, который делает то же самое.

Программирование родителей: не так просто, как кажется!

Если вы узнаете себя (или своих родителей) в приведенном выше примере, это здорово! Вы получили представление о асинхронном программировании. Опять же, вы можете довольно легко переключать контексты между конкурирующими задачами, выбирая некоторые задачи и возобновляя другие. Теперь вы попытаемся запрограммировать это поведение для виртуальных родителей!

Мысленный эксперимент № 1: Синхронный родитель

Как бы вы создали родительскую программу для выполнения вышеуказанных задач синхронно? Поскольку наблюдение за детьми является первоочередной задачей, возможно, ваша программа сделает именно это. Родитель присматривает за детьми, ожидая чего-то, что может потребовать их внимания. Тем не менее, ничего другого (как чековая книжка или прачечная) не будет сделано в этом сценарии.

Теперь вы можете назначать приоритеты задачам так, как вам хочется, но только один из них может произойти в любой момент времени. Это результат синхронного, пошагового подхода. Как и синхронный веб-сервер, описанный выше, это будет работать, но, возможно, это не лучший способ. Родитель не сможет выполнить другие задания, пока дети не уснут. Все другие задачи будут выполняться позже, до поздней ночи. (Через пару недель этого и многие настоящие родители могут выпрыгнуть из окна!)

Мысленный эксперимент № 2: Родитель опроса

Если бы вы использовали опрос (polling), то вы могли бы изменить положение вещей так, чтобы было выполнено несколько задач. При таком подходе родитель периодически отрывается от текущей задачи и проверяет, нужно ли уделять внимание каким-либо другим задачам.

Давайте сделаем интервал опроса примерно пятнадцать минут. Теперь каждые пятнадцать минут ваш родитель проверяет, не нужно ли уделить внимание стиральной машине, сушилке или детям. Если нет, то родитель может вернуться к работе с чековой книжкой. Однако, если какое-либо из этих заданий требует внимания, родитель позаботится об этом, прежде чем вернуться к чековой книжке. Этот цикл продолжается до следующего тайм-аута из цикла опроса.

Такой подход лучше чем прежний, так как внимание уделяется множеству задач. Однако есть пара проблем:

  1. Родитель может потратить много времени на проверку вещей, которые не требуют внимания: например когда стиральная машина и сушилка еще в процессе работы, или детям не нужно никакого внимания, если не произойдет что-то непредвиденное.
  2. Родитель может пропустить выполненные задачи, которые требуют внимания: например, если посудомоечная машина закончила свой цикл в начале интервала опроса, она не будет получать никакого внимания в течение пятнадцати минут! Более того, присматривать за детьми якобы самая приоритетная задача. Они не будут терпеть пятнадцать минут без внимания, когда что-то могло пойти не так.

Вы можете решить эти проблемы, сократив интервал опроса, но теперь ваш родитель (процессор) будет тратить больше времени на переключение контекста между задачами. Это когда вы начинаете достигать точки убывающей отдачи. (Еще раз, пару недель, живя так и, ну… смотрите предыдущий комментарий о окнах и прыжках.)

Мысленный эксперимент № 3: Родитель как поток (Threading)

«Если бы я мог клонировать себя…». Если вы родитель, то у вас, вероятно, были похожие мысли! Поскольку вы программируете виртуальных родителей, вы можете сделать это, используя многопоточность (threading). Это механизм, который позволяет одновременно выполнять несколько разделов одной программы. Каждый раздел кода, который выполняется независимо, называется потоком (thread), и все потоки разделяют одно и то же пространство памяти.

Если вы рассматриваете каждую задачу как часть одной программы, вы можете разделить их и запустить как потоки. Другими словами, вы можете «клонировать» родителя, создавая один экземпляр для каждой задачи: наблюдать за детьми, следить за стиральной машиной, следить за сушилкой и подсчитывать чековую книжку. Все эти «клоны» работают независимо.

Это звучит как довольно хорошее решение, но здесь есть и некоторые проблемы. Одним из них является то, что вам придется явно указывать каждому родительскому экземпляру, что делать в вашей программе. Это может привести к некоторым проблемам, поскольку все экземпляры совместно используют все программное пространство.

Например, скажем, что родитель A контролирует сушилку. Родитель А видит, что одежда сухая, поэтому он берет под контроль сушилку и начинает выгружать одежду. В то же время родитель B видит, что стирка завершена, и он начинает готовить одежду для новой стирки. Для этого, родитель B также должен взять под контроль сушилку, чтобы он мог положить мокрую одежду внутрь. Но этого не может быть, потому что родитель A в настоящее время контролирует сушилку.

Через некоторое время родитель А завершил выгрузку одежды. Теперь они хотят взять под контроль стиральную машину и начать переносить одежду в пустую сушилку. Этого также не может быть, потому что родитель B в настоящее время контролирует стиральную машину!

Эти два родителя зашли в тупик (deadlocked). Оба имеют контроль над своим собственным ресурсом и хотят контролировать другой ресурс. Они будут ждать вечно, пока другой родительский экземпляр не освободит контроль. Как программист, вы должны написать код, чтобы решить эту ситуацию.

Примечание. Многопоточные программы позволяют создавать несколько параллельных путей выполнения, которые совместно используют одно и то же пространство памяти. Это и преимущество, и недостаток. Любая память, совместно используемая потоками, подчиняется одному или нескольким потокам, пытающимся одновременно использовать одну и ту же общую память. Это может привести к повреждению данных, чтению данных в недопустимом состоянии и к просто беспорядочным данным в целом.

В многопоточном программировании переключение контекста происходит под управлением системы, а не программиста. Система контролирует, когда переключать контексты и когда предоставлять потокам доступ к общим данным, тем самым изменяя контекст использования памяти. Всеми этими проблемами можно управлять в многопоточном коде, но трудно разобраться, а отладить еще труднее.

Вот еще одна проблема, которая может возникнуть из-за многопоточности. Предположим, что ребенок получил травму и нуждается в неотложной помощи. Родителю С было поручено присматривать за детьми, поэтому он сразу же забирает ребенка. При оказании неотложной помощи Родителю C необходимо выписать достаточно большой чек, чтобы покрыть расходы на посещение врача.

Тем временем родитель D дома работает над чековой книжкой. Он не знают о написании этого большого чека, поэтому он очень удивиться, когда возникнет новое списание чекового счета!

Помните, что эти два родительских экземпляра работают в одной программе. Семейный чековый счет является общим ресурсом, поэтому вам нужно найти способ, чтобы родитель, наблюдающий за ребенком, проинформировал родителя, который занимается чековой книжкой. В противном случае вам потребуется предоставить какой-то механизм блокировки, чтобы ресурс чековой книжки мог использовать только один родитель за раз с обновлениями.

Использование возможностей Python Async на практике

Теперь мы собираемся воспользоваться некоторыми из подходов, изложенных в мысленных экспериментах выше, и превратить их в работающие программы на Python.

Все примеры в этой статье были протестированы с Python 3.7.2. Файл requirements.txt указывает, какие модули вам нужно установить, чтобы запустить все примеры.

Синхронное программирование

В этом примере мы будем извлекать work из очереди и обрабатывать ее. Очередь в Python – это структура данных FIFO (первым пришел – первым обслужен). Он предоставляет методы, чтобы поместить вещи в очередь и вывести их снова в том порядке, в котором они были помещены.

В нашем примере работа состоит в том, чтобы получить номер из очереди и рассчитать количество циклов до этого числа. Когда начинается цикл мы выведем сообщение на экран, а также при будет сообщение о завершение. Эта программа демонстрирует способ обработки нескольких синхронных задач в очереди.

Программа с именем example_1.py полностью указана ниже:

import queue

def task(name, work_queue):
    if work_queue.empty():
        print(f"Task {name} nothing to do")
    else:
        while not work_queue.empty():
            count = work_queue.get()
            total = 0
            print(f"Task {name} running")
            for x in range(count):
                total += 1
            print(f"Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program.
    """
    # Create the queue of 'work'
    work_queue = queue.Queue()

    # Put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some synchronous tasks
    tasks = [
        (task, "One", work_queue),
        (task, "Two", work_queue)
    ]

    # Run the tasks
    for t, n, q in tasks:
        t(n, q)

if __name__ == "__main__":
    main()

Давайте посмотрим, что делает каждая строка:

  • Строка 1 импортирует модуль queue. В ней программа будет хранит work, которая должна быть выполнена задачами task.
  • Строки с 3 по 13 определяют task(). Эта функция извлекает work из work_queue и обрабатывает ее до тех пор, пока больше не нужно ничего делать.
  • Строка 15 определяет main() для запуска задач программы.
  • Строка 20 создает work_queue. Все задачи используют этот общий ресурс для извлечения работы.
  • Строки с 23 по 24 помещают work в work_queue. В данном случае это просто случайное количество значений для задач, которые нужно обработать.
  • Строки с 27 по 29 создают список кортежей задач со значениями параметров, которые эти задачи будут переданы.
  • Строки с 33 по 34 перебирают список кортежей задач, вызывая каждый из них и передавая ранее определенные значения параметров.
  • Строка 36 вызывает main() для запуска программы.

task в этой программе – просто функция, принимающая строку и очередь в качестве параметров. При выполнении она ищет что-либо в очереди для обработки. Если есть над чем поработать, она извлекает значения из очереди, запускает цикл for для подсчета до этого значения и выводит итоговое значение в конце. Она продолжает получать работу из очереди, пока не останется ничего и не завершится.

При запуске эта программа выдаст следующий результат:

Task One running
Task One total: 15
Task One running
Task One total: 10
Task One running
Task One total: 5
Task One running
Task One total: 2
Task Two nothing to do

Это показывает, что Task One делает всю работу. Цикл while, в который попадает Task One внутри task(), совершает всю работу в очереди и обрабатывает ее. Когда этот цикл завершается, Task Two получает шанс на выполнение. Однако он обнаруживает, что очередь пуста, поэтому Task Two говорит, что ему нечего делать, и затем завершается. В коде нет ничего, что позволяло бы Task One и Task Two переключать контексты и работать вместе.

Простой кооперативный параллелизм

Следующая версия программы позволит двум задачам работать вместе. Добавление оператора yield означает, что цикл передаст управление в указанной точке, сохраняя при этом свой контекст. Таким образом, уступающая задача может быть возобновлена позже.

Оператор yield превращает task() в генератор (generator). Функция генератора вызывается так же, как и любая другая функция в Python, но когда выполняется оператор yield, управление возвращается вызывающей функции. По сути, это переключение контекста, поскольку управление переходит от функции генератора к вызывающей стороне.

Интересная часть заключается в том, что управление можно вернуть функции генератора, вызвав next() в генераторе. Это переключение контекста обратно к функции генератора, которая запускает выполнение со всеми переменными функции, которые были определены до того, как yield все еще остается неизменным.

Цикл while в main() использует это при вызове next(t). Это утверждение перезапускает задачу с того места, где оно было ранее выполнено. Все это означает, что вы контролируете, когда происходит переключение контекста: когда оператор yield выполняется в task().

Это форма совместной многозадачности. Программа передает контроль над своим текущим контекстом, чтобы можно было запустить что-то еще. В этом случае она позволяет циклу while в main() запускать два экземпляра task() в качестве функции генератора. Каждый экземпляр выполняет работу из одной и той же очереди. Это довольно умно, но для достижения тех же результатов, что и в первой программе, нужно потрудиться. Программа example_2.py демонстрирует этот простой параллелизм и приведена ниже:

import queue

def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        print(f"Task {name} running")
        for x in range(count):
            total += 1
            yield
        print(f"Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program.
    """
    # Create the queue of 'work'
    work_queue = queue.Queue()

    # Put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some tasks
    tasks = [
        task("One", work_queue),
        task("Two", work_queue)
    ]

    # Run the tasks
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

if __name__ == "__main__":
    main()

Вот что происходит в коде выше:

  • Строки с 3 по 11 определяют task(), как и раньше, но добавление yield в строке 10 превращает функцию в генератор. В этом случае происходит переключение контекста и управление возвращается обратно в цикл while в main().
  • Строки с 25 по 28 создают список задач, но немного иначе, чем вы видели в предыдущем примере кода. В этом случае каждая задача вызывается с параметрами, указанными в переменной tasks. Это необходимо для запуска функции генератора task() в первый раз.
  • Строки с 34 по 39 – это модификации цикла while в main(), которые позволяют task() работать совместно. То есть управление возвращается к каждому экземпляру task(), тем самым, позволяя циклу продолжить работу и запустить другую задачу.
  • Строка 35 возвращает управление task() и продолжает его выполнение после точки вызова yield.
  • Строка 39 устанавливает готовую переменную. Цикл while заканчивается, когда все задачи завершены и удалены из tasks.

Это вывод, полученный при запуске этой программы:

Task One running
Task Two running
Task Two total: 10
Task Two running
Task One total: 15
Task One running
Task Two total: 5
Task One total: 2

Вы можете видеть, как Task One и Task Two работают и выполняют работу из очереди. Обе задачи обрабатывают работу, и каждая отвечает за два элемента в очереди. Это интересно, но опять же, для достижения этих результатов требуется немало усилий.

Хитрость здесь заключается в использовании оператора yield, который превращает task() в генератор и выполняет переключение контекста. Программа использует этот переключатель контекста для управления циклом while в main(), позволяя двум экземплярам задачи работать совместно.

Обратите внимание, как Task Two выводит итоговую сумму первой. Это может привести вас к мысли, что задачи выполняются асинхронно. Тем не менее, это все еще синхронная программа. Она структурирована так, что две задачи могут обмениваться контекстами взад и вперед. Причина, по которой Task Two выводит итоговую сумму в первую очередь, заключается в том, что она считает только до 10, а Task One – до 15. Task Two просто достигает своей первой суммы, поэтому она выводит свои выходные данные на консоль до Task One.

Совместный параллелизм с блокировкой вызовов

Следующая версия программы такая же, как и предыдущая, за исключением добавления time.sleep(delay) в теле цикла задач. Это добавляет задержку, основанную на значении, полученном из рабочей очереди, к каждой итерации цикла задачи. Задержка добавлена для имитации эффекта блокирующего вызова, происходящего в вашей задаче.

Блокирующий вызов – это код, который останавливает ЦП от каких-либо действий в течение некоторого времени. В вышеупомянутых мысленных экспериментах, если родитель не мог оторваться от проверки чековой книжки до тех пор, пока она не была завершена, это был бы блокирующий вызов.

time.sleep (delay) делает то же самое в этом примере, потому что CPU не может ничего сделать, кроме как ждать истечения задержки.

elapsed_time предоставляет способ получить истекшее время от момента создания экземпляра класса до его вызова в качестве функции. Программа example_3.py указана ниже:

import time
import queue
from lib.elapsed_time import ET

def task(name, queue):
    while not queue.empty():
        delay = queue.get()
        et = ET()
        print(f"Task {name} running")
        time.sleep(delay)
        print(f"Task {name} total elapsed time: {et():.1f}")
        yield

def main():
    """
    This is the main entry point for the program.
    """
    # Create the queue of 'work'
    work_queue = queue.Queue()

    # Put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    tasks = [
        task("One", work_queue),
        task("Two", work_queue)
    ]

    # Run the tasks
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print(f"\nTotal elapsed time: {et():.1f}")

if __name__ == "__main__":
    main()

Вот что отличается в коде выше:

  • Строка 1 импортирует модуль time, чтобы дать программе доступ к time.sleep().
  • Строка 11 изменяет task(), чтобы включить time.sleep(delay), чтобы имитировать задержку ввода-вывода. Это меняет цикл for, который выполнял подсчет в example_1.py.

Когда вы запустите эту программу, вы увидите следующий вывод:

Task One running
Task One total elapsed time: 15.0
Task Two running
Task Two total elapsed time: 10.0
Task One running
Task One total elapsed time: 5.0
Task Two running
Task Two total elapsed time: 2.0

Total elapsed time: 32.01021909713745

Как и раньше, выполняются как Task One, так и Task Two, которые выполняют работу из очереди и обрабатывают ее. Однако даже с добавлением задержки вы можете видеть, что совместный параллелизм ничего вам не дал. Задержка останавливает обработку всей программы, и ЦП просто ожидает окончания задержки ввода-вывода.

Совместный параллелизм с неблокирующими вызовами

Следующая версия программы была немного изменена. Она использует асинхронные функции Python 3, asyncio/await.

Модули time и queue были заменены пакетом asyncio. Это дает вашей программе доступ к асинхронному дружественному (неблокирующему) режиму сна и очереди. Изменение в task() заключаются в добавление async в строке 4. Это указывает Python, что функция будет асинхронной.

Другим большим изменением является удаление операторов time.sleep (delay) и yield и замена их на await asyncio.sleep (delay). Это создает неблокирующую задержку, которая выполнит переключение контекста обратно к вызывающей функции main().

Цикл while внутри main() больше не существует. Вместо task_array есть вызов await asyncio.gather(…). Это говорит asyncio две вещи:

  1. Создайте две задачи на основе task() и запустите их.
  2. Подождите, пока обе из них будут завершены, прежде чем двигаться вперед.

Последняя строка программы asyncio.run(main ()) запускает main(). Это создает так называемый цикл обработки событий event loop). Именно этот цикл запустит main(), который, в свою очередь, запустит два экземпляра task().

В основе асинхронной системы Python лежит цикл событий. Он запускает весь код, включая main(). Когда код задачи выполняется, процессор занят выполнением работы. Когда достигается ключевое слово await, происходит переключение контекста, и управление возвращается обратно в цикл событий. Цикл обработки событий просматривает все задачи, ожидающие события (в данном случае тайм-аут asyncio.sleep (delay)), и передает управление задаче с событием, которое готово.

await asyncio.sleep (delay) неблокирует процессор. Вместо ожидания истечения времени ожидания, ЦП регистрирует событие сна в очереди задач цикла событий и выполняет переключение контекста, передавая управление в цикл событий. Цикл событий непрерывно ищет завершенные события и передает управление задаче, ожидающей этого события. Таким образом, процессор может оставаться занятым, если работа доступна, а цикл обработки событий отслеживает события, которые произойдут в будущем.

Примечание. Асинхронная программа выполняется в одном потоке выполнения. Переключение контекста с одного раздела кода на другой, который может повлиять на данные, полностью под вашим контролем. Это означает, что вы контролируете весь доступ к данным совместно используемой памяти перед выполнением переключения контекста. Это упрощает проблему общей памяти, присущую многопоточному коду.

Код example_4.py приведен ниже:

import asyncio
from lib.elapsed_time import ET

async def task(name, work_queue):
    while not work_queue.empty():
        delay = await work_queue.get()
        et = ET()
        print(f"Task {name} running")
        await asyncio.sleep(delay)
        print(f"Task {name} total elapsed time: {et():.1f}")

async def main():
    """
    This is the main entry point for the program.
    """
    # Create the queue of 'work'
    work_queue = asyncio.Queue()

    # Put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        await work_queue.put(work)

    # Run the tasks
    et = ET()
    await asyncio.gather(
        asyncio.create_task(task("One", work_queue)),
        asyncio.create_task(task("Two", work_queue)),
    )
    print(f"\nTotal elapsed time: {et():.1f}")

if __name__ == "__main__":
    asyncio.run(main())

Вот что отличается между этой программой и example_3.py:

  • Строка 1 импортирует asyncio для получения доступа к функциональности Python async.
  • В строке 4 показано добавление ключевого слова async перед определением task(). Это сообщает программе, что task может выполняться асинхронно.
  • Строка 9 заменяет time.sleep(delay) неблокирующим asyncio.sleep(delay), который также возвращает управление (или переключает контексты) обратно в цикл основного события.
  • Строка 17 создает неблокирующую асинхронную work_queue.
  • Строки с 20 по 21 асинхронно выполняют работу в work_queue с использованием ключевого слова await.
  • Строки с 25 по 28 создают две задачи и собирают их вместе, поэтому программа будет ожидать завершения обеих задач.
  • В строке 32 программа запускается асинхронно. Он также запускает внутренний цикл событий.

Когда вы посмотрите на вывод этой программы, обратите внимание на то, как запускаются одновременно и Task One, и Task Two:

Task One running
Task Two running
Task Two total elapsed time: 10.0
Task Two running
Task One total elapsed time: 15.0
Task One running
Task Two total elapsed time: 5.0
Task One total elapsed time: 2.0

Total elapsed time: 17.0

Это указывает на то, что await asyncio.sleep (delay) не является блокирующим, и что выполняется обе работы.

В конце программы вы заметите, что общее затраченное время по сути составляет половину времени, которое потребовалось для запуска example_3.py. В этом преимущество программы, которая использует асинхронные функции Python! Каждое задание можно было запускать в ожидании await asyncio.sleep(delay) одновременно. Общее время выполнения программы теперь меньше суммы ее частей. Вы оторвались от синхронной модели!

Синхронные (блокирующие) HTTP-вызовы

Следующая версия программы – это как шаг вперед, так и шаг назад. Программа выполняет некоторую реальную работу с реальным вводом-выводом, отправляя HTTP-запросы на список URL-адресов и получая содержимое страницы. Однако это происходит блокирующим (синхронным) образом.

Программа была изменена, чтобы импортировать замечательный модуль requests, чтобы сделать запросы HTTP. Кроме того, очередь теперь содержит список URL-адресов, а не номеров. Кроме того, task() больше не увеличивает счетчик. Вместо этого requests получают содержимое URL-адреса, полученного из очереди, и печатают, сколько времени потребовалось для этого.

Код example_5.py приведен ниже:

import queue
import requests
from lib.elapsed_time import ET

def task(name, work_queue):
    with requests.Session() as session:
        while not work_queue.empty():
            url = work_queue.get()
            print(f"Task {name} getting URL: {url}")
            et = ET()
            session.get(url)
            print(f"Task {name} total elapsed time: {et():.1f}")
            yield

def main():
    """
    This is the main entry point for the program.
    """
    # Create the queue of 'work'
    work_queue = queue.Queue()

    # Put some 'work' in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://apple.com",
        "http://microsoft.com",
        "http://facebook.com",
        "http://twitter.com"
    ]:
        work_queue.put(url)

    tasks = [
        task("One", work_queue),
        task("Two", work_queue)
    ]

    # Run the tasks
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print(f"\nTotal elapsed time: {et():.1f}")

if __name__ == "__main__":
    main()

Вот что происходит в этой программе:

  • Строка 2 импортирует requests, что обеспечивает удобный способ совершать HTTP-вызовы.
  • В строке 11 вводится задержка, похожая на example_3.py. Однако на этот раз он вызывает session.get(url), который возвращает содержимое URL, полученного из work_queue.
  • Строки с 23 по 32 помещают список URL в work_queue.

Когда вы запустите эту программу, вы увидите следующий вывод:

Task One getting URL: http://google.com
Task One total elapsed time: 0.3
Task Two getting URL: http://yahoo.com
Task Two total elapsed time: 0.8
Task One getting URL: http://linkedin.com
Task One total elapsed time: 0.4
Task Two getting URL: http://apple.com
Task Two total elapsed time: 0.3
Task One getting URL: http://microsoft.com
Task One total elapsed time: 0.5
Task Two getting URL: http://facebook.com
Task Two total elapsed time: 0.5
Task One getting URL: http://twitter.com
Task One total elapsed time: 0.4

Total elapsed time: 3.2

Как и в более ранних версиях программы, yield превращает task() в генератор. Он также выполняет переключение контекста, позволяющее запустить другой экземпляр задачи.

Каждая задача получает URL из рабочей очереди, извлекает содержимое страницы и сообщает, сколько времени потребовалось для получения этого содержимого.

Как и раньше, yield позволяет обеим задачам работать совместно. Однако, поскольку эта программа работает синхронно, каждый вызов session.get() блокирует ЦП, пока не будет получена страница. Обратите внимание на общее время, необходимое для запуска всей программы в конце. Это будет иметь смысл для следующего примера.

Асинхронные (неблокирующие) HTTP-вызовы

Эта версия программы модифицирует предыдущую версию для использования асинхронных функций Python. Она также импортирует модуль aiohttp, который является библиотекой для асинхронного выполнения HTTP-запросов с использованием asyncio.

Здесь задачи были изменены, был удалить вызов yield, поскольку код для выполнения вызова HTTP GET больше не блокируется. Он также выполняет переключение контекста обратно в цикл событий.

Программа example_6.py приведена ниже:

import asyncio
import aiohttp
from lib.elapsed_time import ET

async def task(name, work_queue):
    async with aiohttp.ClientSession() as session:
        while not work_queue.empty():
            url = await work_queue.get()
            print(f"Task {name} getting URL: {url}")
            et = ET()
            async with session.get(url) as response:
                await response.text()
            print(f"Task {name} total elapsed time: {et():.1f}")

async def main():
    """
    This is the main entry point for the program.
    """
    # Create the queue of 'work'
    work_queue = asyncio.Queue()

    # Put some 'work' in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://apple.com",
        "http://microsoft.com",
        "http://facebook.com",
        "http://twitter.com",
    ]:
        await work_queue.put(url)

    # Run the tasks
    et = ET()
    await asyncio.gather(
        asyncio.create_task(task("One", work_queue)),
        asyncio.create_task(task("Two", work_queue)),
    )
    print(f"\nTotal elapsed time: {et():.1f}")

if __name__ == "__main__":
    asyncio.run(main())

Вот что происходит в этой программе:

  • Строка 2 импортирует библиотеку aiohttp, которая обеспечивает асинхронный способ выполнения HTTP-вызовов.
  • Строка 5 помечает task() как асинхронную функцию.
  • Строка 6 создает менеджер контекста сеанса aiohttp.
  • В строке 11 создается менеджер контекста ответа aiohttp. Он также выполняет HTTP-вызов GET для URL-адреса, взятого из work_queue.
  • Строка 12 использует ответ для асинхронного получения текста, полученного из URL.

Когда вы запустите эту программу, вы увидите следующий вывод:

Task One getting URL: http://google.com
Task Two getting URL: http://yahoo.com
Task One total elapsed time: 0.3
Task One getting URL: http://linkedin.com
Task One total elapsed time: 0.3
Task One getting URL: http://apple.com
Task One total elapsed time: 0.3
Task One getting URL: http://microsoft.com
Task Two total elapsed time: 0.9
Task Two getting URL: http://facebook.com
Task Two total elapsed time: 0.4
Task Two getting URL: http://twitter.com
Task One total elapsed time: 0.5
Task Two total elapsed time: 0.3

Total elapsed time: 1.7

Посмотрите на общее прошедшее время, а также на время получения содержимое каждого URL. Вы увидите, что длительность составляет примерно половину совокупного времени всех вызовов HTTP GET. Это связано с тем, что вызовы HTTP GET выполняются асинхронно. Другими словами, вы эффективно используете преимущества процессора, позволяя ему делать несколько запросов одновременно.

Поскольку процессор очень быстрый, этот пример может создать столько задач, сколько URL-адресов. В этом случае время выполнения программы будет соответствовать времени поиска единственного медленного URL.

Заключение

В этой статье мы рассказали о инструментах, необходимых для того, чтобы начать работать с методами асинхронного программирования. Использование асинхронных функций Python дает вам программный контроль, над переключением контекста. Это позволит вам решать более сложные проблемы, с которыми вы можете столкнуться в многопоточном программировании.

Асинхронное программирование – мощный инструмент, но он не полезен для всех типов программ. Например, если вы пишете программу, которая вычисляет число Пи с точностью до миллионных знаков после запятой, то асинхронный код вам не поможет. Такая программа связана с процессором, без большого количества операций ввода-вывода. Однако, если вы пытаетесь реализовать сервер или программу, которая выполняет ввод-вывод (например, доступ к файлам или сети), использование асинхронных функций Python может иметь огромное значение.

Подводя итог, в этой статье вы узнали:

  • Что такое синхронные программы
  • Чем такое асинхронные программы
  • Зачем вам может понадобится написать асинхронные программы?
  • Как использовать встроенные функции асинхронности в Python
Была ли вам полезна эта статья?
[8 / 5]

Spread the love
Подписаться
Уведомление о
guest
2 Комментарий
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
чучу
чучу
4 лет назад

Отличная статья. Спасибо! Ждем еще подобных переводов по Python!

Roman
Roman
3 лет назад

спасибо за статью!
можно вместо queue использовать collections.deque и
попроще написать циклы

while tasks:
    for task in tasks:
        if next(task, True):
            tasks.remove(task)