Хабрахабр

[Из песочницы] Паттерны корутин asyncio: за пределами await

Мне попалась статья Yeray Diaz "Asyncio Coroutine Patterns: Beyond await", в которой автор весьма увлекательно рассматривает применение asyncio и делится некоторыми приемами. Предисловие переводчика:
В очередной раз наступив на грабли при работе с python asyncio я отправился на просторы интернета, чтобы найти что-то более приятное, чем сухая документация. Поскольку я не нашел ничего такого же цельного на русском языке, то решился её перевести.

Это очередной импорт библиотеки антигравитации: import antigravity Asyncio — конкурентная мечта python программиста: пишешь код, граничащий с синхронным, и позволяешь Python сделать все остальное.

Печально. На самом деле все совсем не так, конкурентное программирование — тяжелое занятие и, пока корутины позволяют нам избегать ада обратных вызовов, что может увести вас достаточно далеко, вам все еще нужно думать о создании задач, получении результатов и элегантном перехвате исключений.

Плохие новости в том, что не всегда сразу очевидно что неправильно и как это исправить. Хорошие новости в том, что все из этого возможно в asyncio. Ниже несколько паттернов, которые я обнаружил во время работы с asyncio.

Прежде чем мы начнем:

Следуя отклику на мою предыдущую статью, я также использовал async/await синтаксис введенный в Python 3. Я использовал любимую библиотеку aiohttp для выполнения асинхронных HTTP запросов и Hacker News API, потому что это простой и хорошо известный сайт, который придерживается знакомого сценария использования. Я предполагал, что читатель знаком с идеями, которые здесь описаны. 5. И в конечном счете, все примеры доступны в GitHub репозитории этой статьи.

Хорошо, давайте начнем!

Рекурсивные корутины

Для таких задач API включает несколько методов в классе AbstractEventLoop, а также функции в библиотеке. Создание и запуск задач тривиально в asyncio. Но обычно вы хотите комбинировать результаты от этих задач и обрабатывать их каким-то образом, и рекурсия — это отличный пример данной схемы, а также демонстрирует простоту корутин в сравнении с остальными средствами конкурентности.

Представьте, что мы просто слишком заняты чтобы проверять HackerNews, или может быть вам просто нравится хороший холивар, так что вы хотите реализовать систему, которая извлекает число комментариев для конкретного постна HN и, если оно выше порога, уведомляет вас. Обычный случай для использования asyncio — это создание вебкраулера какого-то вида. Вы немного погуглили и нашли документацию на HN API, просто то что нужно, однако вы заметили в документации следующее:

Обойдите дерево и сосчитайте их. Хотите узнать общее число комментариев статьи?

Вызов принят!

""" Рекурсивная функция решает проблему упрощая входные данные до тех пор, пока мы не достигнем основного тривиального случая, а затем объединяет результаты наверху стека. Предположим, что мы хотим посчитать число комментариев определенного поста Hacker News рекурсивно агрегируя число его потомков """ import asyncio
import argparse
import logging
from urllib.parse import urlparse, parse_qs
from datetime import datetime import aiohttp
import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/.json"
FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the comments of a Hacker News post.')
parser.add_argument('--id', type=int, default=8863, help='ID of the post in HN, defaults to 8863')
parser.add_argument('--url', type=str, help='URL of a post in HN')
parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """ Получаем URL страницу используя aiohttp, который возвращает распарсерный JSON ответ. Как полагается в документации aiohttp мы используем сессию повторно. """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """ Извлекаем данные текущего поста и рекурсивно извлечем для всех комментариев. """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response: # Базовый случай. Без комментариев return 0 # считаем число комментариев этого поста, как количество комментариев (на первом уровне) number_of_comments = len(response['kids']) # создаем рекурсивные задачи для всех комментариев log.debug('{:^6} > Fetching {} child posts'.format( post_id, number_of_comments)) tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']] # запустим задачи и получим результат results = await asyncio.gather(*tasks) # сводим нижлежащие комментарии и добавляем их к числу комментариев этого поста number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments def id_from_HN_url(url): """ Вернем значение `id` URL аргумента запроса если он передан или None. """ parse_result = urlparse(url) try: return parse_qs(parse_result.query)['id'][0] except (KeyError, IndexError): return None if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) loop.close()

Не стесняйтесь попробовать запустить скрипт с флагом “ —verbose” для более детального вывода.

[14:47:32] > Calculating comments took 2.23 seconds and 73 fetches [14:47:32] -- Post 8863 has 72 comments

Заметим, что этот код читается практически полностью так, как было бы в случае с синхронным кодом. Давайте пропустим шаблонный код и перейдем прямо к рекурсивной корутине.

async def post_number_of_comments(loop, session, post_id): """ Извлекаем данные текущего поста и рекурсивно извлечем для всех комментариев. """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response: # base case, there are no comments return 0 # считаем число комментариев этого поста, как количество комментариев (на первом уровне) number_of_comments = len(response['kids']) # создаем рекурсивные задачи для всех комментариев log.debug('{:^6} > Fetching {} child posts'.format( post_id, number_of_comments)) tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] # запустим задачи и получим результат results = await asyncio.gather(*tasks) # сведем комментарии наследников и добавим к количеству комментариев поста number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments

  1. Сначала получим JSON с данными поста.
  2. Рекурсивно обойдем каждого из наследников.
  3. В конце концов достигнем базового случая и вернем ноль,
    когда у поста нет откликов.
  4. При возврате от базового случая прибавим ответы на текущий пост
    к числу наследников и вернем

Это отличный пример того, что Бретт Слаткин описывает как fan-in и fan-out, мы fan-out чтобы получить данные от наследников и fan-in сводим полученные данные, чтобы расчитать число комментариев

Здесь я использую функцию gather, которая эффективно ожидает пока все корутины выполнятся и вернут список своих результатов. В API asyncio есть пару способов для того, чтобы выполнять эти fan-out операции.

Это позволяет нам выразить довольно сложное поведение в одной изящной и (легко) читаемой корутине. Обратим внимание на то, как использование корутины также хорошо соответствует рекурсии с одной любой точкой, в которой присутствует любое число корутин, ожидающих ответы на свои запросы во время вызова функции gather и возобновляющих выполнение после того, как операция ввода/вывода завершится.

Ладно, давайте поднимемся на ступеньку выше. "Очень просто" — скажете вы?

Выстрелил и забыл

Мы можем просто добавить выражение "if" в конец рекурсивной функции чтобы добиться этого: Представьте, что вы хотите отправлять себе сообщение на электронную почту с постами, у которых количество комментариев больше определенного значения, и вы хотите сделать это таким же способом как мы обходили дерево постов.

async def post_number_of_comments(loop, session, post_id): """ Получаем данные для текущего поста и рекурсивно для всех его комментариев. """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # Базовый случай. Без комментариев return 0 # считаем комментарии этого поста как число комментариев. number_of_comments = len(response['kids']) # создаем рекурсивные задачи для всех комментариев tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']] # запускаем задачи и получаем результаты results = await asyncio.gather(*tasks) # сводим нижлежащие комментарии и добавляем их к числу комментариев этого поста number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) # Логируем если число комментариев больше порогового значения if number_of_comments > MIN_COMMENTS: await log_post(response) return number_of_comments async def log_post(post): """ Симуляция логирования поста. """ await asyncio.sleep(random() * 3) log.info("Post logged")

Это в последний раз. Да, я использовал asyncio.sleep. Обещаю.

[09:41:02] Post logged
[09:41:04] Post logged
[09:41:06] Post logged
[09:41:06] > Calculating comments took 6.35 seconds and 73 fetches
[09:41:06] -- Post 8863 has 72 comments

Это значительно медленнее чем раньше!
Причина в том, что, как мы обсуждали ранее, await приостанавливает выполнение корутины до тех пор, пока future не будет выполнена, но, поскольку нам не нужен результат логирования, нет реальной причины поступать таким образом.

Быстро взглянув на API asyncio найдем функцию ensure_future, которая запланирует запуск корутины, обернет её в объект Task и вернёт. Нам нужно "выстрелить и забыть" нашей корутиной, а поскольку мы не можем ждать ее завершения используя await, нам нужен другой путь для запуска выполнения корутины без ее ожидания. Помня, что раньше корутина была запланирована, цикл событий будет контролировать результат работы нашей корутины в какой-то момент в будущем, когда другая корутина будет в состоянии ожидания.

Здорово, давайте заменим await log_post следующим образом:

async def post_number_of_comments(loop, session, post_id): """ Получаем данные для текущего поста и рекурсивно для всех его комментариев. """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 # считаем комментарии этого поста как число комментариев. number_of_comments = len(response['kids']) # создаем рекурсивные задачи для всех комментариев tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] # планируем задачи и получаем результаты results = await asyncio.gather(*tasks) # сводим нижлежащие комментарии и добавляем их к числу комментариев этого поста number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) # Логируем если число комментариев больше порогового значения if number_of_comments > MIN_COMMENTS: asyncio.ensure_future(log_post(response)) return number_of_comments

[09:42:57] > Calculating comments took 1.69 seconds and 73 fetches
[09:42:57] -- Post 8863 has 72 comments
[09:42:57] Task was destroyed but it is pending!
task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1109197f8>()]>>
[09:42:57] Task was destroyed but it is pending!
task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919948>()]>>
[09:42:57] Task was destroyed but it is pending!
task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919978>()]>>

преследующее пользователей asyncio по всему миру. Кхм, устрашающее Task was destroyed but it is pending! 69 с.), плохие новости в том, что asyncio не нравится выход за пределы "выстрелил-и-забыл". Хорошие новости в том, что мы вернулись ко времени, которое получили ранее (1.

Проблема в том, что мы принудительно закрываем цикл событий после того, как получаем результат работы корутины post_number_of_comments, не оставляя нашей задаче log_post времени для завершения.

У нас есть две возможности:
мы либо позволяем циклу событий работать бесконечно, используя run_forever, и вручную прерываем работу скрипта, или мы используем метод all_tasks класса Task для того, чтобы найти все работающие задачи и ждем когда закончится расчет количества комментариев.

Давайте попробуем выйти из этой ситуации быстро внеся изменения после нашего вызова post_number_of_comments:

if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [ task for task in asyncio.Task.all_tasks() if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close()

[09:47:29] > Calculating comments took 1.72 seconds and 73 fetches
[09:47:29] — Post 8863 has 72 comments
[09:47:30] Post logged
[09:47:31] Post logged
[09:47:32] Post logged

Сейчас мы уверены, что логирующие задачи завершены.
Предположение, что метод all_tasks отлично работает в случаях, с которыми мы имеем дело, — отличная идея, когда задачи выполняются соответствующим образом в нашем цикле событий, но в более сложных случаях может быть любое количество выполняющихся задач, источник которых может находится за пределами нашего кода.

Как вы знаете функция ensure_future возвращает объект Task, Мы можем использовать это для регистрации наших задач с низким приоритетом. Другой подход — навести порядок после того, как мы самостоятельно регистрируем абсолютно все корутины, которые мы планировали запустить и позволяем выполнится, отложенным ранее,
как только завершится подсчет комментариев. Давайте просто определим список task_registry и сохраним в нем futures:

async def post_number_of_comments(loop, session, post_id): """Retrieve data for current post and recursively for all comments. """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 # calculate this post's comments as number of comments number_of_comments = len(response['kids']) # create recursive tasks for all comments tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] # schedule the tasks and retrieve results results = await asyncio.gather(*tasks) # reduce the descendents comments and add it to this post's number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) # Log if number of comments is over a threshold if number_of_comments > MIN_COMMENTS: # Add the future to the registry task_registry.append(asyncio.ensure_future(log_post(response))) return number_of_comments # (... ommitted code ...) # if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() task_registry = [] # define our task registry with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [task for task in task_registry if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close()

[09:53:46] > Calculating comments took 1.68 seconds and 73 fetches
[09:53:46] — Post 8863 has 72 comments
[09:53:46] Post logged
[09:53:48] Post logged
[09:53:49] Post logged

Все задачи запускаются в одном потоке и циклом событий необходимо управлять соответственно позволяя выделить время для завершения задач. Мы извлекаем следующий урок — asyncio не следует рассматривать как распределенную очередь задач типа Celery.

Что приводит к другому общепринятому паттерну:

Периодически запускаемые корутины

Продолжая с нашим примером о HN(и мы ранее провели отличную работу), мы решили,
что решительно важно рассчитывать число комментариев к публикации HN как только они становятся доступны и пока они находятся в списке 5 последних записей.

Отлично, так мы можем просто опрашивать эту конечную точку для получения новых публикаций и расчета числа комментариев к ним, скажем раз в пять секунд. Быстрый взгляд на API HN показывает конечную точку, которая возвращает 500 последних записей.

Я внес несколько незначительных изменений чтобы получить топ записей вместо обращения по непосредственному URL поста. Хорошо, поскольку теперь мы переходим к периодическому опросу, мы можем просто использовать бесконечный while цикл, ожидать выполнения задачи опроса (вызывать await), и засыпать(вызывать sleep) на необходимый промежуток времени.

"""
An example of periodically scheduling coroutines using an infinite loop of
awaiting and sleeping. """ import asyncio
import argparse
import logging
from datetime import datetime import aiohttp
import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json"
FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.')
parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll')
parser.add_argument('--limit', type=int, default=5,help='Number of new stories to calculate comments for')
parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """ Получаем URL страницу используя aiohttp, который возвращает распарсенный JSON ответ. Как полагается в документации aiohttp мы используем сессию повторно. """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """ Извлекаем данные текущего поста и рекурсивно извлечем для всех комментариев. """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 # считаем число комментариев этого поста, как количество комментариев (на первом уровне) number_of_comments = len(response['kids']) # создаем рекурсивные задачи для всех комментариев tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] # планируем задачи и получаем результаты results = await asyncio.gather(*tasks) # сводим нижлежащие комментарии и добавляем их к числу комментариев этого поста number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments async def get_comments_of_top_stories(loop, session, limit, iteration): """ Получаем последние публикации на HN. """ response = await fetch(session, TOP_STORIES_URL) tasks = [post_number_of_comments(loop, session, post_id) for post_id in response[:limit]] results = await asyncio.gather(*tasks) for post_id, num_comments in zip(response[:limit], results): log.info("Post {} has {} comments ({})".format(post_id, num_comments, iteration)) async def poll_top_stories_for_comments(loop, session, period, limit): """ Периодически опрашиваем для получения новых статей и числа комментариев. """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format(limit, iteration)) await get_comments_of_top_stories(loop, session, limit, iteration) log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: loop.run_until_complete( poll_top_stories_for_comments( loop, session, args.period, args.limit)) loop.close()

[10:14:03] Calculating comments for top 5 stories. (1)
[10:14:06] Post 13848196 has 31 comments (1)
[10:14:06] Post 13849430 has 37 comments (1)
[10:14:06] Post 13849037 has 15 comments (1)
[10:14:06] Post 13845337 has 128 comments (1)
[10:14:06] Post 13847465 has 27 comments (1)
[10:14:06] > Calculating comments took 2.96 seconds and 244 fetches
[10:14:06] Waiting for 5 seconds…
[10:14:11] Calculating comments for top 5 stories. (2)
[10:14:14] Post 13848196 has 31 comments (2)
[10:14:14] Post 13849430 has 37 comments (2)
[10:14:14] Post 13849037 has 15 comments (2)
[10:14:14] Post 13845337 has 128 comments (2)
[10:14:14] Post 13847465 has 27 comments (2)
[10:14:14] > Calculating comments took 3.04 seconds and 244 fetches
[10:14:14] Waiting for 5 seconds…

Снова последствия использования await и блокирование пока мы не получим наши результаты обратно.
Эти особенности не создают проблемы в случае, когда задаче требуется больше времени чем пять секунд. Отлично, но есть незначительная проблема: если вы обратили внимание на временную отметку,
то задача не запускается строго раз в 5 секунд, она запускается через 5 секунд после того как _завершится выполнение get_comments_of_topstories. Также, кажется ошибочным использовать _run_untilcomplete когда корутина спроектирована как бесконечная.

Хорошие новости в том, что теперь мы эксперты по _ensurefuture, и можем просто впихнуть ее в код вместо использования await...

async def poll_top_stories_for_comments(loop, session, period, limit): """ Периодически опрашиваем для получения новых статей и числа комментариев. """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period)

[10:55:40] Calculating comments for top 5 stories. (1) [10:55:40] > Calculating comments took 0.00 seconds and 0 fetches [10:55:40] Waiting for 5 seconds… [10:55:43] Post 13848196 has 32 comments (1) [10:55:43] Post 13849430 has 48 comments (1) [10:55:43] Post 13849037 has 16 comments (1) [10:55:43] Post 13845337 has 129 comments (1) [10:55:43] Post 13847465 has 29 comments (1) [10:55:45] Calculating comments for top 5 stories. (2) [10:55:45] > Calculating comments took 0.00 seconds and 260 fetches [10:55:45] Waiting for 5 seconds… [10:55:48] Post 13848196 has 32 comments (2) [10:55:48] Post 13849430 has 48 comments (2) [10:55:48] Post 13849037 has 16 comments (2) [10:55:48] Post 13845337 has 129 comments (2) [10:55:48] Post 13847465 has 29 comments (2)

00 секунд и нет выборок И затем следующая итерация занимает ноль секунд и 260 выборок? Кхм… Ладно, хорошие новости в том, что временная отметка располагается точно через пять секунд, но что за 0.

Это довольно мелкие задачи, поскольку мы можем жить без сообщений, но что если нам нужны результаты выполнения задач? Это одно из последствий ухода от await, теперь мы больше не блокируем корутину и просто переходим на следующую строку, которая печатает ноль секунд и, в первый раз, ноль извлеченных сообщений.

Тогда, мой друг, нам нужно прибегнуть к… callback-ам (поеживаемся((( )

Мы больше не на территории await, у нас приключения с ручным запуском задач что приводит к нашему сценарию использования. Я знаю, знаю, весь смысл корутин в том, чтобы избежать callback — ов, но это потому, что драматический подзаголовок статьи — "За пределами await". spoiler Что это вам дает?

Как мы обсуждали ранее _ensurefuture возвращает объект Future, к которому мы мы можем добавить callback используя _add_donecallback.

В таком случае создаем экземпляр для каждой задачи, чтобы у нас был корректный подсчет выборок. Прежде чем мы это сделаем, и чтобы иметь корректный подсчет выборок(fetches) мы приходим к тому, что мы должны инкапсулировать нашу корутину извлечения в класс URLFetcher. Также удаляем глобальную переменную, которая все равно вносила баг:

""" Пример периодического запуска корутин с использованием бесконечного цикла событий для запуска задач при помощи ensure_future и вызова sleep. Добавляя обратный вызов к объекту future, который возвращает функция ensure_future, мы можем корректно выводить статистику прошедшего времени и выборок, используя новый класс URLFetcher Но читаемость ухудшается. """ import asyncio
import argparse
import logging
from datetime import datetime import aiohttp
import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json"
FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.')
parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll')
parser.add_argument('--limit', type=int, default=5, help='Number of new stories to calculate comments for')
parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO) class URLFetcher(): """ Обеспечивает подсчет URL выборок для определенной задачи """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """ Получаем URL страницу используя aiohttp, который возвращает распарсерный JSON ответ. Как полагается в документации aiohttp мы используем сессию повторно. """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, fetcher, post_id): """ Извлекаем данные текущего поста и рекурсивно извлечем для всех комментариев. """ url = URL_TEMPLATE.format(post_id) response = await fetcher.fetch(session, url) # Базовый случай. Нет комментариев. if response is None or 'kids' not in response: return 0 # считаем число комментариев этого поста, как количество комментариев (на первом уровне) number_of_comments = len(response['kids']) # создаем рекурсивные задачи для всех комментариев tasks = [post_number_of_comments( loop, session, fetcher, kid_id) for kid_id in response['kids']] # sпланируем задачи и получаем результаты results = await asyncio.gather(*tasks) # сводим нижлежащие комментарии и добавляем их к числу комментариев этого поста number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments async def get_comments_of_top_stories(loop, session, limit, iteration): """ Получаем топ публикаций HN. """ fetcher = URLFetcher() # create a new fetcher for this task response = await fetcher.fetch(session, TOP_STORIES_URL) tasks = [post_number_of_comments( loop, session, fetcher, post_id) for post_id in response[:limit]] results = await asyncio.gather(*tasks) for post_id, num_comments in zip(response[:limit], results): log.info("Post {} has {} comments ({})".format( post_id, num_comments, iteration)) return fetcher.fetch_counter # return the fetch count async def poll_top_stories_for_comments(loop, session, period, limit): """ Периодически опрашиваем для получения новых статей и числа комментариев. """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format(limit, iteration)) future = asyncio.ensure_future(get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: loop.run_until_complete( poll_top_stories_for_comments( loop, session, args.period, args.limit)) loop.close()

[12:23:40] Calculating comments for top 5 stories. (1) [12:23:40] Waiting for 5 seconds... [12:23:43] Post 13848196 has 38 comments (1) [12:23:43] Post 13849430 has 72 comments (1) [12:23:43] Post 13849037 has 19 comments (1) [12:23:43] Post 13848283 has 64 comments (1) [12:23:43] Post 13847465 has 34 comments (1) [12:23:43] > Calculating comments took 3.17 seconds and 233 fetches [12:23:45] Calculating comments for top 5 stories. (2) [12:23:45] Waiting for 5 seconds... [12:23:47] Post 13848196 has 38 comments (2) [12:23:47] Post 13849430 has 72 comments (2) [12:23:47] Post 13849037 has 19 comments (2) [12:23:47] Post 13848283 has 64 comments (2) [12:23:47] Post 13847465 has 34 comments (2) [12:23:47] > Calculating comments took 2.47 seconds and 233 fetches [12:23:50] Calculating comments for top 5 stories. (3) [12:23:50] Waiting for 5 seconds...

Хорошо, уже лучше, но давайте сфокусируемся на секции callback:

async def poll_top_stories_for_comments(loop, session, period, limit): """ Периодически опрашиваем для получения новых статей и числа комментариев. """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period)

Мы также возвращаем количество выборок(fetch) из экземпляра URLFetcher как результат _get_comments_of_topstories и получаем эти данные как результат future. Обратим внимание на то, что callback функции необходимо принимать один аргумент, в котором передается сам объект future.

Я говорил вам, что это будет неплохо, зато здесь точно нет await. Видите?

И вы будете правы, их можно использовать, нам просто нужно внести изменения в poll_top_stories_for_comments: Пока мы обсуждаем callback-и, при неизбежных скитаниях по документации API asyncio вам посчастливилось найти пару методов в AbstractBaseLoop с именами _calllater и _callat,
которые выглядят как что-то удобное для реализации периодической корутины.


def poll_top_stories_for_comments(loop, session, period, limit, iteration=0): """ Периодическая функция для запуска get_comments_of_top_stories. """ log.info("Calculating comments for top {} stories ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 loop.call_later( period, partial( # or call_at(loop.time() + period) poll_top_stories_for_comments, loop, session, period, limit, iteration ) ) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: poll_top_stories_for_comments( loop, session, args.period, args.limit) loop.run_forever() loop.close()

Обратим внимание на несколько изменений: Результаты аналогичны полученным ранее.

  • Мы ушли от бесконечного цикла и засыпания к функции планирующей запустить себя после выполнения, во время которого вызывается _ensurefuture на нашей корутине запланированной выполнится.
    (Примечание переводчика: применили рекурсивный подход)
  • Поскольку _poll_top_stories_forcomments используется на каждой итерации цикла для того, чтобы запланировать запуск самой себя, мы естественно должны использовать функцию _runforever цикла событий, чтобы он был всегда запущен.

Что произойдет с нашей замечательной системой? Ладно, это все хорошо и даже превосходно, но, что если Бог недоступен — наше соединение разорвалось посередине выполнения задачи? Давайте проведем такую симуляцию добавив выброс исключения после нескольких запросов URL:


MAXIMUM_FETCHES = 5 class URLFetcher(): """ Обеспечивает подсчет URL выборок для определенной задачи """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """ Получаем URL страницу используя aiohttp, который возвращает распарсерный JSON ответ. Как полагается в документации aiohttp мы используем сессию повторно. """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 if self.fetch_counter > MAXIMUM_FETCHES: raise Exception('BOOM!') async with session.get(url) as response: return await response.json()

[12:51:00] Calculating comments for top 5 stories. (1) [12:51:00] Waiting for 5 seconds… [12:51:01] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion(‘BOOM!’,)>) at 05_periodic_coroutines.py:121 handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion(‘BOOM!’,)>) at 05_periodic_coroutines.py:121> Traceback (most recent call last): File “/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line 126, in _run self._callback(*self._args) File “05_periodic_coroutines.py”, line 122, in callback fetch_count = fut.result() File “05_periodic_coroutines.py”, line 100, in get_comments_of_top_stories results = await asyncio.gather(*tasks) File “05_periodic_coroutines.py”, line 69, in post_number_of_comments response = await fetcher.fetch(session, url) File “05_periodic_coroutines.py”, line 58, in fetch raise Exception(‘BOOM!’) Exception: BOOM! [12:51:05] Calculating comments for top 5 stories. (2) [12:51:05] Waiting for 5 seconds…

Не так здорово, правда?

Переходить к следующей части этой серии, где я исследую возможности, которые у нас есть для обработки ошибок и остальных задач: Паттерны asyncio корутин: Ошибки и Отмена Что делать?

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть