Главная » Хабрахабр » [Из песочницы] Подписываемся на Kafka по HTTP или как упростить себе Веб-хуки

[Из песочницы] Подписываемся на Kafka по HTTP или как упростить себе Веб-хуки

Существует множество способов обработки сообщений из Pub-Sub систем: использование отдельного сервиса, выделение изолированного процесса, оркестрация пулом процессов/потоков, сложные IPC, Poll-over-Http и многие другие. Сегодня я хочу рассказать о том, как использовать Pub-Sub по HTTP и про свой сервис, написанный специально для этого.

Использование готового HTTP -бэкенда сервисов в некоторых случаях является идеальным решением для обработки очереди сообщений:

  1. Балансировка из коробки. Обычно, бэкенд и так стоит за балансировщиком и имеет готовую к нагрузкам инфраструктуру, что сильно упрощает работу с сообщениями.
  2. Использование обычного REST-контроллера (любой HTTP-ресурс). Потребление сообщений по HTTP сводит к минимуму затраты на реализацию консюмеров под разные языки, если бэкенд разношерстный.
  3. Упрощение использования Веб-хуков других сервисов. Сейчас почти каждый сервис (Jira, Gitlab, Mattermost, Slack…) так или иначе поддерживает Веб-хуки для взаимодействия с внешним миром. Можно облегчить жизнь, если научить очередь выполнять функции HTTP-диспатчера.

Этот подход имеет и минусы:

  1. Можно забыть о легковесности решения. HTTP тяжёлый протокол, а использование фреймворков на стороне консюмера мгновенно приведёт к увеличению задержки (latency) и нагрузки.
  2. Лишаемся сильных сторон Poll-подхода, получая слабые стороны Push.
  3. Обработка сообщений теми же инстансами сервиса, которые обрабатывают клиентов, может сказаться на отзывчивости. Это несущественно, так как лечится балансировкой и изоляцией.

Я реализовал идею в виде сервиса Queue-Over-Http, о котором и пойдёт речь далее. Проект написан на Kotlin с использованием Spring Boot 2.1. В качестве брокера сейчас доступна только Apache Kafka.
Далее в статье подразумевается, что читатель знаком с Kafka и знает про коммиты (commit) и оффсеты (offset) сообщений, принципы групп (group) и консюмеров (consumer), а также понимает, чем партиция (partition) отличается от топика (topic). Если есть пробелы, советую ознакомиться с этим разделом документации по Kafka перед продолжением чтения.
Queue-Over-Http представляет из себя сервис, который выступает посредником между брокером сообщений и конечным HTTP-консюмером (сервис позволяет легко реализовать поддержку отправки сообщений консюмерам любым другим способом, например, различными *RPC). На данный момент доступны только операции подписки, отписки и просмотра списка консюмеров Отправка сообщений брокеру (produce) по HTTP пока не реализована в силу невозможности гарантировать порядок сообщений без специальной поддержки со стороны продюсера.

В первом случае выключается автобаланс партиций. Ключевой фигурой сервиса является консюмер, который может подписаться как на конкретные партиции, так и просто на топики (паттерн топика поддерживается). Архитектурно каждый подписчик ассоциируется с нативным Java-клиентом Kafka. После подписки, указанный HTTP-ресурс начинает получать сообщения из назначенных партиций Kafka.

занимательная история про KafkaConsumer

У Kafka есть замечательный Java-клиент, который умеет многое. Его использую в адаптере очереди для получения сообщений от брокера и дальнейшей отправки в локальные очереди сервиса. Стоит оговориться, что клиент работает исключительно в контексте одного потока.

Запускаем в одном потоке, пишем простейший планировщик нативных клиентов, делая упор на уменьшение latency. Идея адаптера простая. То есть пишем что-то похожее:

while (!Thread.interrupted()) } val committed = doCommit() if (!hasWork && committed == 0) { // засыпаем, если нечего делать Thread.sleep(1) }
}

Казалось бы, всё замечательно, latency минимальный даже при наличии десятков консюмеров. На практике получилось, что KafkaConsumer к такому режиму эксплуатации совершенно не готов и даёт allocation rate около 1.5 МБ/сек в простое. При 100 консюмерах allocation rate достигает 150 МБ/сек и заставляет GC чаще вспоминать о приложении. Конечно, весь этот мусор находится в young области, GC вполне справляется с этим, но всё же, решение не идеально.

Это даёт оверхед по памяти и диспетчеризации, но другого выхода нет. Очевидно, нужно идти типичным для KafkaConsumer путём и каждого подписчика размещаю теперь в своём потоке.

ZERO на Duration.ofMillis(100). Переписываю код сверху, убирая внутренний цикл и меняя Duration. Однако, Poll с таймаутом в 100мс задерживает всю очередь коммитов на эти самые 100мс, а это неприемлемо много. Получается хорошо, allocation rate падает до приемлемых 80-150 КБ/сек на одного консюмера.

С этим методом путь к low-latency прост: когда приходит новый запрос на коммит, кладём его в очередь, а на нативном консюмере вызываем wakeup. В процессе поиска решений проблемы вспоминаю про KafkaConsumer::wakeup, который бросает WakeupException и прерывает любую блокирующую операцию на консюмере. За передачу управления с помощью исключений нужно сразу давать по рукам, но раз уж по-другому никак… В рабочем цикле ловим WakeupException и идём коммитить то, что накопилось.

Обработка этой ситуации захламит код флагом, разрешающим делать wakeup. Оказывается, и этот вариант далёк от совершенства, так как любая операция на нативном консюмере теперь выкидывает WakeupException, в том числе, сам коммит.

В итоге, был рождён франкенштейн из рефлексии, который в точности копирует оригинальный метод poll, добавляя выход из цикла по флагу. Прихожу к выводу, что было бы неплохо модифицировать метод KafkaConsumer::poll, чтобы он мог прерываться штатно, по дополнительному флагу. Этот флаг устанавливается отдельным методом interruptPoll, который, к тому же, на селекторе клиента вызывает wakeup, чтобы снять блокировку потока на операции ввода-вывода.

Реализовав таким образом клиент, получаю скорость реакции с момента поступления запроса на коммит до его обработки до 100 микросекунд, и отличный latency на выборку сообщений из брокера, что вполне устраивает.

Каждая партиция представлена отдельной локальной очередью, куда адаптер пишет сообщения из брокера. Воркер забирает из неё сообщения и отдаёт их на исполнение, то есть, на отправку по HTTP.

При подписке можно указать concurrencyFactor каждого топика (распространяется на каждую назначенную партицию независимо). Сервис поддерживает пакетную обработку сообщений для увеличения пропускной способности. Как только все сообщения из пачки были однозначно отработаны консюмером, сервис принимает решение об очередном коммите оффсета последнего по порядку сообщения в Kafka. Например, concurrencyFactor=1000 означает, что одновременно могут быть отправлены потребителю 1000 сообщений в виде HTTP-запросов. Отсюда второе значение concurrencyFactor — максимальное число повторно обработанных сообщений потребителем в случае падения Kafka или Queue-Over-Http.

Так как автокоммит на нативном клиенте отключен, такая схема не нарушает гарантий At-Least-Once.
Высокое значение concurrencyFactor увеличивает пропускную способность очереди (throughput) за счёт уменьшения количества коммитов, которые занимают до 10 мс в худшем случае. Для уменьшения задержек очередь имеет loadFactor = concurrencyFactor * 2, что позволяет считывать из брокера в два раза больше сообщений, чем может быть отправлено. При этом, повышается нагрузка на потребителя.

Очерёдность отправки сообщений в рамках пачки не гарантирована, но её можно достигнуть, если установить concurrencyFactor=1.

Коммиты — важная часть работы сервиса. Когда очередная пачка данных готова, оффсет последнего сообщения из пачки тут же коммитится в Kafka, и только после успешного коммита становится доступна для обработки следующая пачка. Часто этого недостаточно и требуется автокоммит. Для этого существует параметр autoCommitPeriodMs, который имеет мало общего с классическим периодом автокоммита у нативных клиентов, которые коммитят последнее прочитанное из партиции сообщение. Представим, что concurrencyFactor=10. Сервис отослал все 10 сообщений и ждёт готовности каждого из них. Первым завершается обработка сообщения 3, потом сообщения 1, а затем, сообщения 10. В этот момент наступает время автокоммита. Важно не нарушить At-Least-Once семантику. Поэтому, можно коммитить только первое сообщение, то есть, оффсет 2, так как только оно на этот момент успешно обработано. Далее, до следующего автокоммита обрабатываются сообщения 2, 5, 6, 4, и 8. Теперь необходимо коммитить только оффсет 7, и так далее. Автокоммит почти не влияет на пропускную способность.
В штатном режиме работы сервис отправляет сообщение консюмеру один раз. Если по каким-то причинам оно вызвало 4xx или 5xx ошибку, то сервис будет повторно отправлять сообщение, ожидая успешной обработки. Время между попытками может быть сконфигурировано отдельным параметром.

Не советую использовать это для чувствительных данных, ситуации отказа консюмеров всегда нужно корректировать вручную. Так же, возможно задать количество попыток, по истечению которых сообщение будет помечено как обработанное, что прекратит повторные отправки не зависимо от статуса ответа. Залипания сообщения можно отслеживать по логам сервиса и мониторингу статусов ответа консюмера.

про залипания

Закрытое таким образом TCP-соединение остаётся в статусе TIME_WAITED, пока не будет подчищено операционной системой спустя какое-то время. Обычно, HTTP-сервер, отдавая 4xx или 5xx статус ответа, отсылает ещё и заголовок Connection: close. Это может вылиться в отсутствие свободных портов на машине для установки TCP-соединения и сервис будет сыпаться исключениями в логи на каждую отправку. Проблема в том, что такие соединения занимают целый порт, который невозможно переиспользовать до освобождения. В стандартном режиме работы это не проблема.
На практике, на Windows 10 порты кончаются спустя 10-20 тысяч отправок ошибочных сообщений в течение 1-2 минут.

Каждое сообщение, извлечённое из брокера, отправляется консюмеру по HTTP на указанный при подписке ресурс. По умолчанию сообщение отправляется POST-запросом в теле. Это поведение можно изменить, указав любой другой метод. Если метод не поддерживает отправку данных в теле, можно указать название строкового параметра, в котором будет отправлено сообщение. Помимо этого, при подписке можно указать дополнительные заголовки, которые будут добавлены к каждому сообщению, что удобно для базовой авторизации с помощью токенов. К каждому сообщению добавляются заголовки с указанием идентификатора консюмера, топика и партиции, откуда сообщение было прочитано, номер сообщения, partition key, если применим, а также название самого брокера.
Для оценки производительности я использовал ПК (Windows 10, OpenJDK-11 (G1 без тюнинга), i7-6700K, 16GB), на котором запущен сервис и ноутбук (Windows 10, i5-8250U, 8GB), на котором крутился продюсер сообщений, HTTP-ресурс консюмера и Kafka с дефолтными настройками. ПК подключен к роутеру по проводному соединению 1Gb/s, ноутбук по 802.11ac. Продюсер каждые 100 мс в течении 1000 секунд записывает сообщения, длиной в 110 байт, в назначенные топики, на которые подписаны консюмеры (concurrencyFactor=500, автокомит выключен) из разных групп. Стенд далёк от идеального, но некоторую картину получить можно.

Ключевым измеряемым параметром является влияние сервиса на latency.

Именно dt является влиянием сервиса на latency сообщения. Пусть:
— tq – временная метка получения сервисом сообщения от нативного клиента
— dt0 – время между tq и временем отправки сообщения из локальной очереди в пул экзекьютеров
— dt – время между tq и временем отправки HTTP-запроса.

В ходе измерений были получены следующие результаты (C – консюмеры, T – топики, M — сообщения):

Максимальные значения dt (около 60мс) не указаны специально, так как зависят от работы GC, а не от самого сервиса. В стандартном режиме работы сервис сам по себе почти не влияет latency, а потребление памяти минимально. Сгладить разброс максимальных значений может помочь специальный тюнинг GC или замена G1 на Shenandoah.

В этом режиме увеличивается потребление памяти, так как время ответа на запросы сильно вырастает, что мешает своевременной очистке ресурсов. Всё кардинально меняется, когда консюмер не справляется с потоком сообщений из очереди и сервис включает режим тротлинга. Влияние на latency здесь остаётся на уровне с предыдущими результатами, а высокие значения dt вызваны предзагрузкой сообщений в локальную очередь.

Если кто-то может помочь с организацией замеров на больших нагрузках, с радостью предоставлю сборку для тестов. К сожалению, протестировать на бОльшей нагрузке не имеется возможности, так как ноутбук загибается уже на 1300 RPS.

Теперь перейдём к демонстрации. Для этого нам понадобится:

  • Kafka брокер, готовый к работе. Я возьму поднятый на 192.168.99.100:9092 инстанс от Bitnami.
  • HTTP-ресурс, который будет принимать сообщения. Для наглядности я взял Web-hooks у Slack.

Прежде всего, необходимо поднять сам сервис Queue-Over-Http. Для этого создим в пустой директории application.yml следующего содержания:

spring: profiles: default logging: level: com: viirrtus: queueOverHttp: DEBUG app: persistence: file: storageDirectory: "persist" brokers: - name: "Kafka" origin: "kafka" config: bootstrap.servers: "192.168.99.100:9092"

Здесь мы указываем сервису параметры подключения конкретного брокера, а также, где хранить подписчиков, чтобы между запусками они не терялись. В `app.brokers[].config` можно указывать любые, поддерживаемые нативным клиентом Kafka параметры подключения, полный список можно посмотреть здесь.

В том числе, настраивать логирование. Так как файл конфигурации обрабатывается Spring’ом, вы можете писать туда много интересного.

Используем самый простой способ – docker-compose.yml: Теперь запускаем сам сервис.

version: "2" services: app: image: viirrtus/queue-over-http:0.1.3 restart: unless-stopped command: --debug ports: - "8080:8080" volumes: - ./application.yml:/application.yml - ./persist:/persist

Если этот вариант не устраивает, вы можете собрать сервис из исходников. Инструкция по сборке в Readme проекта, ссылка на который дана в конце статьи.

Для этого необходимо выполнить HTTP-запрос к сервису с описанием консюмера (Consumer): Следующим шагом регистрируем первого подписчика.

POST localhost:8080/broker/subscription
Content-Type: application/json { "id": "my-first-consumer", "group": { "id": "consumers" }, "broker": "Kafka", "topics": [ { "name": "slack.test", "config": { "concurrencyFactor": 10, "autoCommitPeriodMs": 100 } } ], "subscriptionMethod": { "type": "http", "delayOnErrorMs": 1000, "retryBeforeCommit": 10, "uri": "<slack-wh-uri>", "additionalHeaders": { "Content-Type": "application/json" } }
}

Если всё прошло успешно, в ответе будет почти тот же самый отправленный контент.

Пройдёмся по каждому параметру:

  • Consumer.id — идентификатор нашего подписчика
  • Consumer.group.id — идентификатор группы
  • Consumer.broker — указываем на какой из брокеров сервиса нужно подписаться
  • Consumer.topics[0].name — название топика, из которого хотим получать сообщения
  • Consumer.topics[0].config. concurrencyFactor — максимальное количество одновременно отправленных сообщений
  • Consumer.topics[0].config. autoCommitPeriodMs — период принудительного коммита готовых сообщений
  • Consumer.subscriptionMethod.type — тип подписки. В данный момент доступна только HTTP.
  • Consumer.subscriptionMethod.delayOnErrorMs — время до повторной отправки сообщения, которое закончилось ошибкой
  • Consumer.subscriptionMethod.retryBeforeCommit — количество попыток повторной отправки ошибочного сообщения. Если 0 – сообщение будет крутиться до успешной обработки. В нашем случае гарантия полной доставки не так важна, как постоянство потока.
  • Consumer.subscriptionMethod.uri — ресурс, на который будут отправляться сообщения
  • Consumer.subscriptionMethod.additionalHeader — дополнительные заголовки, которые будут отправлены с каждым сообщением. Пометим, что в теле каждого сообщения будет JSON, чтобы Slack мог правильно интерпретировать запрос.

В данном запросе указание HTTP-метода опущено, так как умолчание, POST, Slack вполне устраивает.

С этого момента сервис следит за назначенными партициями топика slack.test на предмет новых сообщений.

Для записи сообщений в топик я воспользуюсь встроенным в Kafka утилитами, которые расположены в /opt/bitnami/kafka/bin запущенного образа Kafka (расположение утилит в других инстансах Kafka может отличаться):

kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test
> {“text”: “Hello!”}

В этот же момент Slack оповестит о новом сообщении:

Чтобы отписать потребителя достаточно сделать POST запрос на `broker/unsubscribe` с тем же контентом, что был при подписке.

На данный момент реализован лишь базовый функционал. Далее планируется улучшить batching, попытаться реализовать Exactly-once семантику, добавить возможность отправки сообщений брокеру по HTTP и, самое главное, добавить поддержку других популярных Pub-Sub.

Версия 0. Сервис Queue-Over-Http сейчас находится в стадии активного развития. 3 является достаточно стабильной для тестирования на dev и stage стендах. 1. 04. Работоспособность была проверена на Windows 10, Debian 9 и Ubuntu 18. Если вы хотите помочь с разработкой или дать любой фидбэк по сервису – добро пожаловать на Github проекта. Использовать в prod можно на свой страх и риск.


Оставить комментарий

Ваш email нигде не будет показан
Обязательные для заполнения поля помечены *

*

x

Ещё Hi-Tech Интересное!

Тест: подходит ли тебе удаленка (не фриланс!)?

Эта статья для тех, кто задумывается, стоит ли пробовать удаленную работу, и взвешивает риски: стиль жизни существенно поменяется, мало ли, что пойдет не так, а вернуться обратно будет непросто. Я не буду преподносить удаленку как райское наслаждение на пляжике под ...

Кипр — минутка мягкого психодела

Фламинго в Ларнаке на Кипре. Поселение, кстати, по-нашему будет «Гробово», потому что «ларнака» — это саркофаг, а их тут в окрестностях нашли немало. Так город и назвали. Здесь тепло, приятно, рядом море, вокруг солнце, небо — ну как в таких ...