Хабрахабр

Что внутри asyncio

Мы оседлаем коллбэки и промчимся по циклу событий сквозь пару ключевых абстракций прямо в корутину. В этой статье я предлагаю читателю совершить со мной в меру увлекательное путешествие в недра asyncio, чтобы разобраться, как в ней реализовано асинхронное выполнение кода. Если на вашей карте питона еще нет этих достопримечательностей, добро пожаловать под кат.

Для затравки — краткая справка о раскинувшейся перед нами местности

pep3156 так же приписывает ей необходимость обеспечить предельно простую интеграцию в уже существовавшие асинхронные фреймворки (Twisted, Tornado, Gevent). asyncio — библиотека асинхронного ввода/вывода которая, согласно pep3153, была создана, чтобы предоставить стандартизованную базу для создания асинхронных фреймворков. 0, в Twisted asyncioreactor доступен с версии 16. Как мы можем сейчас наблюдать, эти цели были успешно достигнуты — появился новый фреймворк на основе asyncio: aiohttp, в Tornado AsyncioMainLoop является циклом событий по умолчанию с версии 5. 0, а для Gevent есть сторонняя библиотека aiogevent. 5.

В её основе лежат три основные абстракции, являющиеся аналогами абстракций, существующих в сторонних фреймворках: asyncio — это гибридная библиотека, использующая одновременно два подхода к реализации асинхронного выполнения кода: классический на коллбэка и относительно новый (по крайней мере для питона) на корутинах.

  • Pluggable Event Loop
    Подключаемый цикл событий. Подключаемый значит, что он может быть в две строчки кода заменен на другой, реализующий такой же интерфейс. Сейчас есть реализации на cython поверх libuv (uvloop) и на языке Rust (asyncio-tokio).
  • Future
    Результат операции, который будет доступен в будущем. Нужен, чтобы получать в корутинах результат выполнения коллбэков.
  • Task
    Специальный подкласс класса Future для запуска корутин на цикле событий.

Поехали!

Он большой и сложный, поэтому сначала рассмотрим его урезанный вариант. Цикл событий — основная составляющая библиотеки, по дорогам, пролегающим через него, данные доставляются в любые её компоненты.

# ~/inside_asyncio/base_loop.py import collections
import random class Loop: def __init__(self): # Очередь для хранения коллбэков self.ready = collections.deque() def call_soon(self, callback, *args): # складывает кортэж из коллбэка и его аргументов в очередь self.ready.append((callback, args)) def run_until_complete(self, callback, *args): # Этот метод выполняет всё работу по запуску коллбэков self.call_soon(callback, *args) # Перекресток вех дорог - основной цикл # он крутится пока очередь не опустеет while self.ready: ntodo = len(self.ready) # внутренний цикл итерируется столько раз # сколько было коллбэков в очереди на момент его запуска for _ in range(ntodo): # на каждой интерации достаёт из очереди # один коллбэк и его параметры и запускает callback, args = self.ready.popleft() callback(*args) def callback(loop): print('Рассказчик') loop.call_soon(print, 'Читатель') loop = Loop()
loop.run_until_complete(c, loop)

Оседлав наш маленький коллбэк, мы отправляемся в путь через call_soon, попадаем в очередь и после краткого ожидания будем выведены на экран.

Эпизод про плохие коллбэки

Если не верите, покатайтесь той же дорогой на коллбэке maybe_print, приходящем к финишу примерно в половине случаев. Стоит упомянуть, что коллбэки это опасные лошадки — если они сбросят вас посреди дороги, интерпретатор питона не сможет помочь понять, где это произошло.

# ~/inside_asyncio/base_loop.py def maybe_print(msg): if random.randint(0, 1): raise Exception(msg) else: print(msg) def starting_point(loop): # Место посадки print('Рассказчик') loop.call_soon(maybe_print, 'Читатель') def main(loop): loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) loop = Loop()
loop.run_until_complete(main, loop)

Из-за того, что функция maybe_print была запущена циклом событий, а не напрямую из starting_point, трейсбэк заканчивается именно на нём, в методе run_until_complete. Ниже показан полный трейсбэк запуска предыдущего примера. По такому трейсбэку невозможно определить, где в коде находится starting_point, что значительно усложнит отладку, если starting_point будут находится в нескольких местах кодовой базы.

$: python3 base_loop.py >> Рассказчик # Доехал первый раз
>> Читатель # Доехал первый раз
>> Рассказчик # Доехал второй раз
>> Traceback (most recent call last):
>> File "base_loop.py", line 42, in <module>
>> loop.run_until_complete(main, loop)
>> File "base_loop.py", line 17, in run_until_complete
>> callback(*args)
>> File "base_loop.py", line 29, in maybe_print
>> raise Exception(msg)
>> Exception: Читаель # не доехал второй раз

Например, на нём основана обработка исключений. Непрерывный стек вызовов нужен не только для вывода полного трейсбэка, но и для реализации других возможностей языка. Пример ниже не заработает, потому что к моменту запуска starting_point, функция main уже будет выполнена:

# ~/inside_asyncio/base_loop.py def main(loop): try: loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) except: pass Loop().run_until_complete(main, loop)

Менеджер контекста в функции main откроет и закроет файл ещё до того, как будет запущена его обработка. Следующий пример тоже не заработает.

# ~/inside_asyncio/base_loop.py def main(loop): with open('file.txt', 'rb') as f: loop.call_soon(process_file, f) Loop().run_until_complete(main, loop)
# тут аналогия с путешествием достигла моего лимита, дальше без неё =(

Для частичного обхода этого недостатка в asyncio пришлось добавить много дополнительного кода, не относящeгося напрямую к решаемой ей задаче. Отсутствие непрерывного стека вызовов ограничивает использование привычных возможностей языка. Этот код, по большей части, отсутствует в примерах — они и без него довольно сложны.

Из цикла событий во внешний мир и обратно

Код, который умеет с ней работать, предоставляется модулем стандартной библиотеки под названием selectors. Цикл событий сообщается с внешним миром через операционную систему посредством событий. В примере ниже ожидаемым событием будет доступность сокета на чтение. Он позволяет сказать операционной системе, что мы ждем какого-то события, а потом спросить, произошло ли оно.

Цикл событий

# ~/inside_asyncio/event_loop.py import selectors
import socket
import collections
from future import Future
from handle import Handle
from task import Task class EventLoop: def __init__(self): self.ready = collections.deque() # Добавляем селектор self.selector = selectors.DefaultSelector() def add_reader(self, sock, callback): # Регистрируем ожидание доступности сокета на чтение # параметры: # сокет, # константа содержащая битовую маску доступности сокета на чтение # кортеж с данными которые мы хотим ассоциировать с этим событием self.selector.register( sock, socket.EVENT_READ, (self._accept_conn, sock, callback) ) def _accept_conn(self, sock, callback): # принимаем входящее соединение conn, addr = sock.accept() conn.setblocking(False) # регистрируем ожидание данных на сокете self.selector.register( conn, socket.EVENT_READ, (callback, conn) ) def run_until_complete(self, callback, *args): self.call_soon(callback, *args) # основной цикл крутится пока очередь не пустая или мы ожидаем каких-то событий while self.ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self.ready.popleft() callback(*args) # второй подцикл итерируется по наступившим событиям for key, events in self.selector.select(timeout=0): # достает коллбэк и аргументы из кортежа с ассоциированными данными callback, *args = key.data # добавляет их в очередь на выполнение self.call_soon(callback, *args) def call_soon(self, callback, *args): self.ready.append((callback, args)) def print_data(conn): print(conn.recv(1000)) def main(loop): # создаём сокет sock = socket.socket() # привязываем к локалхосту на 8086 порту sock.bind(('localhost', 8086)) sock.listen(100) sock.setblocking(False) # добавляем коллбэк для чтения данных loop.add_reader(sock, print_data) loop = EventLoop()
# запускаем цикл событий
loop.run_until_complete(main, loop)

Теперь стало возможным читать из сокета, используя цикл событий. Гонец из внешнего мира оставляет своё сообщение или посылку в селекторе, а селектор передаёт её получателю. Если запустить этот код и подключиться с помощью netcat, то он будет добросовестно выводить всё, что в него будет отправлено.

$: nc localhost 8086 $: python3 event_loop.py "Hi there!" b'"Hi there!"\n' "Hello!" b'"Hello!"\n' "Answer me, please!" b'"Answer me, please!"\n'

Для реализации этой функциональности используются две оставшиеся основные абстракции: Task и Future. В начале статьи говорилось, что asyncio — гибридная библиотека, в которой корутины работают поверх коллбэков. Далее будет показан код этих абстракций, а затем, как, используя их цикл событий, выполняются корутины.

Future

Он нужен для того, чтобы в корутине можно было дождаться завершения выполнения коллбэка и получить его результат. Ниже представлен код класса Future.

Future

# ~/inside_asyncio/future.py import sys
from asyncio import events, CancelledError class Future: # хранит состояние коллбэка результат выполнения которого представляет _state = 'PENDING' # FINISHED, CANCELLED # стек вызовов до того места где был создан экземпляр Future # нужен чтобы в случае возникновения исключения вывести понятный трейсбэк _source_traceback = None # список коллбэков которые должны быть вызваны когда изменится состояние ожидаемого коллбэка _callbacks = [] # исключение если оно было возбуждено во время выполения ожидаемого коллбэка _exception = None # цикл событий чтобы знать где запускать коллбэки на смену состояния _loop = None # результат выполнения ожидаемого коллбэка _result = None def __init__(self, loop): self._loop = loop self._source_traceback = events.extract_stack(sys._getframe(1)) def add_done_callback(self, callback): # добавляет коллбэки на смену состояния в список self._callbacks.append(callback) def _schedule_callbacks(self): # запускает коллбэки на смену состояния на цикле событий for callback in self._callbacks: self._loop.call_soon(callback, self) self._callbacks[:] = [] # Один из следующих трёх методов должен быть вызван для изменения состояния Future
# когда ожидаемый коллбэк каким-либо образом завершит свое выполнение def set_exception(self, exception): # в случае возникновения исключения сохраняем его self._exception = exception # меняем состояние self._state = 'FINISHED' # запускаем коллбэки на смену состояния self._schedule_callbacks() def set_result(self, result): # если ожидаемый коллбэк завершился успешно сохраняем результат выполнения self._result = result self._state = 'FINISHED' self._schedule_callbacks() def cancel(self): # в случае отмены просто меняем состояние self._state = 'CANCELLED' self._schedule_callbacks() def result(self): # метод для получения результат # возбуждает исключение если ожидание завершения коллбэка было отменено if self._state == 'CANCELLED': raise CancelledError # или оно возникло во время выполнения ожидаемого коллбэка if self._exception is not None: raise self._exception # иначе возвращает результат return self._result def __await__(self): # магический метод, вызывается ключевым словом await # если находимся в состоянии ожидания йилдим себя if self._state == 'PENDING': yield self # иначе пытаемся вернуть результат return self.result()

Task

Он нужен для запуска корутины на коллбэчном цикле событий. Это специальный подкласс класса Future.

Task

# ~/inside_asyncio/task.py from asyncio import futures
from future import Future class Task(Future): def __init__(self, coro, *, loop=None): super().__init__(loop=loop) # сохраняет выполняемую корутину self._coro = coro def _step(self, exc=None): # метод вызываемы циклом событий, нужен чтобы крутить корутину try: if exc is None: # если не получено исключение отправляем в корутину None # что заставляет её прокрутится на один шаг result = self._coro.send(None) else: # если получено исключение возбуждаем его в корутине self._coro.throw(exc) except StopIteration: result = None except Exception as exc: self.set_exception(exc) else: # если получили Future из корутины добавляем ей метод # wakeup как коллбэк на смену состояния if isinstance(result, Future): result.add_done_callback(self._wakeup) # иначе шедулим вызов метода step циклом событий еще раз elif result is None: self._loop.call_soon(self._step) def _wakeup(self, future): # метод с помощью которого Future возвращает поток выполнения в ожидающую её Task # с исключением try: future.result() except Exception as exc: self._step(exc) # или без в зависимости от успешности завершения Future else: self._step()

Цикл событий, умеющий работать с Future

EventLoop with Futures

# ~/inside_asyncio/future_event_loop.py import selectors
from selectors import EVENT_READ, EVENT_WRITE
import socket
import collections
from future import Future
from task import Task class EventLoop: def __init__(self): self._ready = collections.deque() self.selector = selectors.DefaultSelector() def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): callback, *args = key.data self.call_soon(callback, *args) def call_soon(self, callback, *args): self._ready.append((callback, args)) # Два метода умеющих работать с Future def sock_accept(self, sock, fut=None): # метод принимающий входящее соединение на сокете # создаёт Future если не получил её fut = fut if fut else Future(loop=self) try: # пытается принять входящее соединение conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): # если входящего соединия нет # регистрирует сам себя на ожидание # передав свежесозданную Future в качестве параметра self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: # если соединение установлено # вызывает метод Future для сохранения результата fut.set_result((conn, address)) self.selector.unregister(sock) return fut def sock_recv(self, sock, n, fut=None): # метод для получения данных из сокета # практичесски идентичен предыдущему за тем исключением, # что пытается не принять соединение, а получить данные из сокета fut = fut if fut else Future(loop=self) try: data = sock.recv(n) except (BlockingIOError, InterruptedError): self.selector.register( sock, EVENT_READ, (self.sock_recv, sock, n, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: fut.set_result(data) self.selector.unregister(sock) return fut async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # ожидаем входящего соединения conn, addr = await loop.sock_accept(sock) # получаем из него данные result = await loop.sock_recv(conn, 1000) print(result) loop = EventLoop()
# заворачиваем корутину в Task task = Task(coro=main(loop), loop=loop)
# шедулим метод степ для запуска на цикле событий
loop.run_until_complete(task._step)

Двинемся дальше

Теперь проследим за тем, как корутина main будет выполняться:

Выполнение

__________________________________________________________________
class EventLoop: def run_until_complete(self, callback, *args): # task._step добавляется в очередь self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() # и практически сразу вызывается callback(*args) # task._step()
___________________________________________________________________
clsss Task: def _step(self, exc=None): try: if exc is None: # отправляет None в корутину result = self._coro.send(None) else:
___________________________________________________________________
async def main(loop): # корутина прокручивается на один шаг # создаётся сокет sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # вызывается метод цикла событий для ожидания входящего соединения conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result)
___________________________________________________________________
class EventLoop: def sock_accept(self, sock, fut=None): # создаёт экземпляр Future fut = fut if fut else Future(loop=self) try: # пытается принять входящее соединение conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): # так как соединения нет # регистрирует сам себя на ожидание # передав свежесозданную Future в качестве параметра self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: -------------------------------------------- self.selector.unregister(sock) # возвращает Future в корутину return fut
___________________________________________________________________
async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # ключевое слово await вызывает метод __await__ полученной Future conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result)
___________________________________________________________________
class Future: def __await__(self): # так как Future находится в состоянии ожидания она йилдит саму себя if self._state == 'PENDING': yield self return self.result()
___________________________________________________________________
class Task(Future): def _step(self, exc=None): try: if exc is None: # результат йилда пробрасывается напрямую в то место откуда ворутину пришел None result = self._coro.send(None) # result = fut -------------------------------- else: # получили Future из корутины добавляем ей метод # wakeup как коллбэк на смену состояния if isinstance(result, Future): result.add_done_callback(self._wakeup) elif result is None: self._loop.call_soon(self._step) # тут выполнение данной корутины останавливается - крутящие её эксземпляры Task и Future
# ждут входящего соединения
# если бы в очереди были другие коллбэки цикл событий бы переключился на их выполнение
___________________________________________________________________
class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): # пришло входящее соединение callback, *args = key.data self.call_soon(callback, *args) # loop.sock_accept(sock, fut)
___________________________________________________________________
class EventLoop: def sock_accept(self, sock, fut=None): fut = fut if fut else Future(loop=self) try: # принимаем входящее соединение conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): -------------------------------- else: # устанавливаем результат Future fut.set_result((conn, address)) self.selector.unregister(sock) return fut
___________________________________________________________________
class Future: def set_result(self, result): # устанавливает результат self._result = result # меняет состояние self._state = 'FINISHED' # вызывает коллбэки на смену состояния self._schedule_callbacks() def _schedule_callbacks(self): for callback in self._callbacks: # у нас только один коллбэк на смену состояния task.wakeup self._loop.call_soon(callback, self) # (task.wakeup, fut) self._callbacks[:] = []
___________________________________________________________________
class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() # на следующей итерации главного цикла # будет вызван метод task.wakeup callback(*args) # task.wakeup(fut)
___________________________________________________________________
class Task(Future): def _wakeup(self, future): try: future.result() except Exception as exc: self._step(exc) else: # так как Future завершилась успешно он вызовет метод task._step self._step() def _step(self, exc=None): try: if exc is None: # который отправит в корутину ещё один None result = self._coro.send(None) else:
___________________________________________________________________
async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # ключевое слово await вызывает метод __awai__ второй раз conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result)
___________________________________________________________________
class Future: def __await__(self): if self._state == 'PENDING': yield self # так как Future завершена возвращаем результат return self.result()
___________________________________________________________________
async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # результат возвращенный из Future помещается в переменные conn и addr conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result)

Вот таким нехитрым способом asyncio выполняет корутины.

Итоги

Она не только решила проблему совместимости, но и вызвала огромный рост интереса к конкурентному программированию в сообществе. Цель создания asyncio была успешно достигнута. Кроме того, asyncio повлияла и на сам язык: в него были добавлены нативные корутины и новые ключевые слова async/await. Новые статьи и библиотеки начали появляться, словно грибы после дождя. В предыдущий раз новое ключевое слово добавлялось в далеком 2003 году, это было ключевое слово yield.

Из этой цели логически вытекает выбор инструментов: если бы не было требования совместимости, возможно, корутинам была бы отдана главная роль. Один из целей создания asyncio было обеспечить предельно простую интеграцию в уже существовавшие асинхронные фреймворки (Twisted, Tornado, Gevent). Из-за того, что при программировании на коллбэках невозможно сохранить непрерывный стэк вызовов, на границе между ними и корутинами пришлось создать дополнительную систему, обеспечивающую поддержку опирающихся на него возможностей языка.

Зачем всё это знать простому пользователю библиотеки, который следует рекомендациям из документации и использует лишь корутины и высокоуровневый API?
Вот кусок документации класса StreamWriter Теперь главный вопрос.

И эти коллбэки из него торчат. Его экземпляр возвращается функцией asyncio.open_connection и является async/await API поверх API на коллбэках. Корутина drain нужна для того, чтобы обеспечить возможность дождаться, пока количество данных в буфере не опустится до заданного значения. Функции write и writelines синхронные, они пытаются писать в сокет, а если не получается, то сбрасывают данные в нижележащий буфер и добавляют коллбэки на запись.

Однако, если помнить об этом, то остается пара неприятных моментов. Если забыть вызвать drain между вызовами write, то внутренний буфер может разрастись до неприличных размеров. Второй: если корутина «сломается», то коллбэк на запись никак об этом не узнает и продолжит писать данные из буфера. Первый: если коллбэк на запись «сломается», то корутина, использующая этот API никак об этом не узнает и, соответственно, не сможет обработать.

Таким образом, даже используя только корутины, будьте готовы к тому, что коллбэки напомнят о себе.

О том, как работать с базами данных из асинхронного кода, вы можете прочитать в этой статье нашего корпоративного блога Antida software.

Показать больше

Похожие публикации

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»