Обзор Async IO в Python 3.7

Spread the love

Перевод обзорной статьи: Guest Contributor  Overview of Async IO in Python 3.7
Будет полезна для введения в модуль asyncio

Модуль Python 3 asyncio предоставляет фундаментальные инструменты для реализации асинхронного ввода-вывода в Python. Он был представлен в Python 3.4, и с каждым последующим релизом модуль развивался.

Это статья содержит общий обзор асинхронной парадигмы и того, как она реализована в Python 3.7.

Блокирующий и неблокирующий ввод/вывод

Проблема, которую пытается решить асинхронностью, – это блокировка ввода-вывода.

По умолчанию, когда ваша программа обращается к данным из источника ввода-вывода, она ожидает завершения этой операции, прежде чем продолжить выполнение программы.

with open('myfile.txt', 'r') as file:  
    data = file.read()
    # Until the data is read into memory, the program waits here
print(data)  

Программа заблокирована от продолжения выполнения во время доступа к физическому устройству и передачи данных.

Еще одним распространенным примером блокировки являются сетевые операции:

# pip install --user requests
import requests

req = requests.get('https://www.stackabuse.com/')

#
# Blocking occurs here, waiting for completion of an HTTPS request
#

print(req.text)  

Во многих случаях задержка, вызванная блокировкой, незначительна. Однако блокировка ввода/вывода очень плохо масштабируется. Если вам нужно дождаться чтения файла 1010 или сетевых транзакций, производительность заметно снизится.

Многопроцессорность, многопоточность и асинхронность

Стратегии минимизации задержек блокирования ввода-вывода делятся на три основные категории: многопроцессорная обработка (multiprocessing), многопоточность (threading) и асинхронность.

Многопроцессорная обработка

Многопроцессорная обработка – это форма параллельных вычислений при котором инструкции выполняются в перекрывающихся временных рамках на нескольких физических процессорах или ядрах. Каждый процесс, порожденный ядром, несет накладные расходы, включая независимо выделенный кусок памяти (heap).

Python реализует такой параллелизм с помощью модуля multiprocessing.

Ниже приведен пример программы на Python 3, которая порождает четыре дочерних процесса, каждый из которых имеет случайную независимую задержку. Выходные данные показывают идентификатор процесса каждого дочернего элемента, системное время до и после каждой задержки, а также текущее и пиковое распределение памяти на каждом шаге.

from multiprocessing import Process  
import os, time, datetime, random, tracemalloc

tracemalloc.start()  
children = 4    # number of child processes to spawn  
maxdelay = 6    # maximum delay in seconds

def status():  
    return ('Time: ' + 
        str(datetime.datetime.now().time()) +
        '\t Malloc, Peak: ' +
        str(tracemalloc.get_traced_memory()))

def child(num):  
    delay = random.randrange(maxdelay)
    print(f"{status()}\t\tProcess {num}, PID: {os.getpid()}, Delay: {delay} seconds...")
    time.sleep(delay)
    print(f"{status()}\t\tProcess {num}: Done.")

if __name__ == '__main__':  
    print(f"Parent PID: {os.getpid()}")
    for i in range(children):
        proc = Process(target=child, args=(i,))
        proc.start()

Результат выполнения:

Parent PID: 16048  
Time: 09:52:47.014906    Malloc, Peak: (228400, 240036)     Process 0, PID: 16051, Delay: 1 seconds...  
Time: 09:52:47.016517    Malloc, Peak: (231240, 240036)     Process 1, PID: 16052, Delay: 4 seconds...  
Time: 09:52:47.018786    Malloc, Peak: (231616, 240036)     Process 2, PID: 16053, Delay: 3 seconds...  
Time: 09:52:47.019398    Malloc, Peak: (232264, 240036)     Process 3, PID: 16054, Delay: 2 seconds...  
Time: 09:52:48.017104    Malloc, Peak: (228434, 240036)     Process 0: Done.  
Time: 09:52:49.021636    Malloc, Peak: (232298, 240036)     Process 3: Done.  
Time: 09:52:50.022087    Malloc, Peak: (231650, 240036)     Process 2: Done.  
Time: 09:52:51.020856    Malloc, Peak: (231274, 240036)     Process 1: Done.  

Многопоточность

Потоки являются альтернативой многопроцессорности, с преимуществами и недостатками.

Потоки независимо планируются, и их выполнение может происходить в течение перекрывающегося периода времени. Однако, в отличие от многопроцессорной обработки, потоки существуют полностью в одном процессе ядра и совместно используют одну выделенную память (heap).

Потоки Python являются параллельными (concurrent) – несколько последовательностей машинного кода выполняются в перекрывающихся временных рамках. Но в реальности они не параллельны – выполнение не происходит одновременно на нескольких физических ядрах.

Основными недостатками потоков Python являются безопасность памяти (memory safety) и состояние гонки (race conditions). Все дочерние потоки родительского процесса работают в одном и том же пространстве общей памяти. Без дополнительных средств защиты один поток может перезаписать общее значение в памяти, и другие потоки об этом не узнают. Такое повреждение данных будет иметь катастрофические последствия.

Для обеспечения безопасности потоков в реализации CPython используют глобальную блокировку интерпретатора (GIL). GIL – это мьютексный механизм, который предотвращает одновременное выполнение нескольких потоков на объектах Python. Фактически это означает, что в любой момент времени выполняется только один поток.

Вот потоковая версия примера многопроцессорной обработки из предыдущего раздела. Обратите внимание, что очень мало что изменилось: multiprocessing.Process заменен на threading.Thread. Как указано в выходных данных, все происходит за один процесс, и объем памяти значительно уменьшается.

from threading import Thread  
import os, time, datetime, random, tracemalloc

tracemalloc.start()  
children = 4    # number of child threads to spawn  
maxdelay = 6    # maximum delay in seconds

def status():  
    return ('Time: ' + 
        str(datetime.datetime.now().time()) +
        '\t Malloc, Peak: ' +
        str(tracemalloc.get_traced_memory()))

def child(num):  
    delay = random.randrange(maxdelay)
    print(f"{status()}\t\tProcess {num}, PID: {os.getpid()}, Delay: {delay} seconds...")
    time.sleep(delay)
    print(f"{status()}\t\tProcess {num}: Done.")

if __name__ == '__main__':  
    print(f"Parent PID: {os.getpid()}")
    for i in range(children):
        thr = Thread(target=child, args=(i,))
        thr.start()

Результат выполнения:

Parent PID: 19770  
Time: 10:44:40.942558    Malloc, Peak: (9150, 9264)     Process 0, PID: 19770, Delay: 3 seconds...  
Time: 10:44:40.942937    Malloc, Peak: (13989, 14103)       Process 1, PID: 19770, Delay: 5 seconds...  
Time: 10:44:40.943298    Malloc, Peak: (18734, 18848)       Process 2, PID: 19770, Delay: 3 seconds...  
Time: 10:44:40.943746    Malloc, Peak: (23959, 24073)       Process 3, PID: 19770, Delay: 2 seconds...  
Time: 10:44:42.945896    Malloc, Peak: (26599, 26713)       Process 3: Done.  
Time: 10:44:43.945739    Malloc, Peak: (26741, 27223)       Process 0: Done.  
Time: 10:44:43.945942    Malloc, Peak: (26851, 27333)       Process 2: Done.  
Time: 10:44:45.948107    Malloc, Peak: (24639, 27475)       Process 1: Done.  

Асинхронность

Асинхронность является альтернативой многопоточности для написания параллельных приложений. Асинхронные события происходят независимо друг от друга (не синхронизированно друг с другом), полностью в одном потоке.

В отличие от многопоточности, в асинхронных программах программист контролирует, когда и как происходит произвольное вытеснение, облегчая изоляцию и избегая условий гонки.

Введение в модуль Python 3.7 asyncio

В Python 3.7 асинхронные операции предоставляются модулем asyncio.

High-Level против Low-Level asyncio API

Компоненты Asyncio подразделяются на API-интерфейсы высокого уровня (для написания программ) и API-интерфейсы низкого уровня (для написания библиотек или сред на основе asyncio).

Каждая программа asyncio может быть написана с использованием только высокоуровневых API. Если вы не пишете фреймворк или библиотеку, вам никогда не нужно трогать API низкого уровня.

С учетом вышесказанного давайте рассмотрим основные высокоуровневые API и обсудим основные концепции.

Корутины (Coroutines)

В общем, coroutine (сокращение от cooperative subroutine) – это подпрограмма, предназначенная для добровольной упреждающей многозадачности: она активно уступает свои ресурсы другим подпрограммам и процессам, а не принудительно вытесняется ядром. Термин «coroutine» был придуман в 1958 году Мелвином Конвеем (в Conway’s Law), чтобы описать код, который сам освобождает свои ресурсы для других частей системы.

В asyncio это так же называется awaiting.

Awaitables, Async, и Await

Любой объект, который можно ожидать прерывание своего процесса выполнения, называется awaitable.

Ключевое слово await приостанавливает выполнение текущей подпрограммы (coroutine) и вызывает указанное ожидание awaitable.

В Python 3.7 есть три ожидаемых объекта (awaitable) – coroutine, task и future.

Coroutine в asyncio – это любая функция Python, в определении которой указан префикс async.

async def my_coro():  
    pass

task в asyncio – это объект, который оборачивает coroutine, предоставляя методы для контроля ее выполнения и запроса ее статуса. task может быть создан с помощью asyncio.create_task() или asyncio.gather().

future в asyncio – это низкоуровневый объект, который выполняет роль заполнителя для данных, которые еще не были рассчитаны или получены. Он может обеспечить пустую структуру для последующего заполнения данными и механизм обратного вызова, который срабатывает, когда данные готовы.

Обычно в Python 3.7 вам никогда не нужно напрямую создавать низкоуровневый объект future.

Event Loops

В asyncio event loop (цикл обработки событий) управляет планированием и передачей ожидаемых объектов. event loop требуется для использования awaitables. Каждая программа asyncio имеет как минимум один event loop. Можно иметь несколько event loop, но в Python 3.7 настоятельно рекомендуется использовать только один event loop.

Ссылка на работающий в данный момент объект цикла получается путем вызова asyncio.get_running_loop().

Sleeping

Подпрограмма asyncio.sleep(delay) блокируется на секунды задержки. Это используется для имитации блокировки ввода-вывода.

import asyncio

async def main():  
    print("Sleep now.")
    await asyncio.sleep(1.5)
    print("OK, wake up!")

asyncio.run(main())  
Инициализация главного Event Loop

Канонической точкой входа в программу asyncio является asyncio.run(main()), где main () – подпрограмма (coroutine) верхнего уровня.

import asyncio

async def my_coro(arg):  
    "A coroutine."  
    print(arg)

async def main():  
    "The top-level coroutine."
    await my_coro(42)

asyncio.run(main())  

Вызов asyncio.run() неявно создает и запускает event loop (цикл обработки событий). У объекта цикла есть много полезных методов, включая loop.time(), который возвращает число с плавающей запятой, представляющее текущее время, измеренное внутренними часами цикла.

Примечание. Функция asyncio.run() не может быть вызвана из существующего цикла событий. Следовательно, возможно, что вы увидите ошибки, если вы запускаете программу в контролирующей среде, такой как Anaconda или Jupyter, которая выполняет собственный цикл обработки событий. Примеры программ в этом разделе и следующих разделах должны запускаться непосредственно из командной строки путем выполнения файла python.

Следующая программа печатает строки текста, блокируясь на одну секунду после каждой строки.

import asyncio

async def my_coro(delay):  
    loop = asyncio.get_running_loop()
    end_time = loop.time() + delay
    while True:
        print("Blocking...")
        await asyncio.sleep(1)
        if loop.time() > end_time:
            print("Done.")
            break

async def main():  
    await my_coro(3.0)

asyncio.run(main())  

Результат выполнения:

Blocking...  
Blocking...  
Blocking...  
Done.  
Задачи (Task)

Задача – это awaitable (ожидаемый) объект, который оборачивается вокруг подпрограммы (coroutine). Чтобы создать и сразу запланировать задачу, вы можете вызвать следующее:

asyncio.create_task(coro(args...))  

Этот код вернет объект задачи. Создание задачи говорит циклу: «Иди и запусти эту coroutine (подпрограмму), как только сможешь».

Если вы ожидаете (await) задачу, выполнение текущей coroutine блокируется, пока эта задача не будет завершена.

import asyncio

async def my_coro(n):  
    print(f"The answer is {n}.")

async def main():  
    # Создав задачу, вы запланирование ее запуск
    # по усмотрению цикла событий.
    mytask = asyncio.create_task(my_coro(42))

    # Если мы позже, выполним await программа останавливается
    # пока задача не будет завершена.
    await mytask

asyncio.run(main())  

Результат выполнения:

The answer is 42.  

Задачи имеют несколько полезных методов для управления подпрограммами (coroutine). В частности, вы можете запросить отмену задачи, вызвав метод .cancel(). Задача будет запланирована для отмены в следующем проходе цикла событий. Отмена не гарантируется: задание может быть выполнено до прохода цикла, и в этом случае отмена не будет.

Сбор объектов Awaitable

Объекты awaitable могут быть собраны в группу, с помощью команды asyncio.gather(awaitables).

Asyncio.gather() возвращает объект awaitable, представляющее собранные awaitable значения.

Сбор – это удобный способ запланировать одновременное выполнение нескольких подпрограмм в качестве задач. Он также связывает собранные задачи несколькими полезными способами:

  • Когда все собранные задачи завершены, их совокупные возвращаемые значения возвращаются в виде списка, упорядоченного в соответствии с порядком списка awaitable.
  • Любая собранная задача может быть отменена без отмены других задач.
  • Сам сбор может быть отменен, отменяя все задачи.
Пример: асинхронные веб-запросы с aiohttp

В следующем примере показано, как можно реализовать это высокоуровневое асинхронное API. Ниже приведена измененная версия, обновленная для Python 3.7, примера Asyncio Скотта Робинсона (Scott Robinson’s nifty asyncio). Его программа использует модуль aiohttp для захвата верхних постов в Reddit и вывода их на консоль.

Убедитесь, что у вас установлен модуль aiohttp, прежде чем запускать скрипт ниже. Вы можете установить модуль с помощью следующей команды pip:

$ pip install --user aiohttp
import sys  
import asyncio  
import aiohttp  
import json  
import datetime

async def get_json(client, url):  
    async with client.get(url) as response:
        assert response.status == 200
        return await response.read()

async def get_reddit_top(subreddit, client, numposts):  
    data = await get_json(client, 'https://www.reddit.com/r/' + 
        subreddit + '/top.json?sort=top&t=day&limit=' +
        str(numposts))

    print(f'\n/r/{subreddit}:')

    j = json.loads(data.decode('utf-8'))
    for i in j['data']['children']:
        score = i['data']['score']
        title = i['data']['title']
        link = i['data']['url']
        print('\t' + str(score) + ': ' + title + '\n\t\t(' + link + ')')

async def main():  
    print(datetime.datetime.now().strftime("%A, %B %d, %I:%M %p"))
    print('---------------------------')
    loop = asyncio.get_running_loop()  
    async with aiohttp.ClientSession(loop=loop) as client:
        await asyncio.gather(
            get_reddit_top('python', client, 3),
            get_reddit_top('programming', client, 4),
            get_reddit_top('asyncio', client, 2),
            get_reddit_top('dailyprogrammer', client, 1)
            )

asyncio.run(main())  

Если вы запустите программу несколько раз, вы увидите, что порядок вывода изменится. Это связано с тем, что запросы JSON отображаются по мере их поступления, что зависит от времени ответа сервера и промежуточной задержки в сети. В системе Linux вы можете наблюдать это в действии, запустив скрипт с префиксом (например, watch -n 5), который будет обновлять вывод каждые 5 секунд:

Другое высокоуровневое API

Надеемся, что этот обзор даст вам основу понимания того, как, когда и зачем использовать asyncio. Другие высокоуровневые API-интерфейсы asyncio, которые здесь не рассматриваются, включают в себя:

  • stream, набор высокоуровневых сетевых примитивов для управления асинхронными событиями TCP.
  • lockeventcondition, асинхронные аналоги примитивов синхронизации, предусмотренных в модуле threading
  • subprocess, набор инструментов для запуска асинхронных подпроцессов, таких как команды оболочки.
  • queue, асинхронный аналог модуля queue.
  • exception, для обработки исключений в асинхронном коде.

Заключение

Имейте в виду, что даже если ваша программа не требует асинхронности, вы все равно можете использовать asyncio, по соображениям производительности. Я надеюсь, что этот обзор даст вам четкое представление о том, как, когда и почему начать использовать asyncio.

Была ли вам полезна эта статья?
[55 / 4.4]

Spread the love
Подписаться
Уведомление о
guest
6 Комментарий
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Дмитрий
Дмитрий
4 лет назад

Спасибо за хороший обзор – помог мне разобратся.

Степан
Степан
4 лет назад

ужасная статья , не читайте , зря потраченное время

Роман
Роман
3 лет назад

Хорошая статья. Даёт начальное понимание.

Александр
Александр
3 лет назад

Ничего не понял. Но очень интересно. Побольше бы разжевываний как работают примеры

Александр
Александр
3 лет назад

Спасибо! Наконец все понятно!

Сергей
Сергей
4 месяцев назад

Поправочка: точнее было назвать не многопроцессорность, а многопроцессность, ведь не о процессорах речь идет, а о процессах (multiprocessing).
Немного столку сбивает название абзаца.
В целом спасибо за статью, оч. помогла!