Хабрахабр

Дао интеграции Сбербанка: от локальных сетей к Kafka и потоковой разработке

Привет, Хабр! Меня зовут Михаил Голованов, в Сбертехе я занимаюсь технической архитектурой и перспективными разработками. У нас, как и у любого современного банка, есть множество систем, которые поддерживают разные стороны работы банка: вклады, счета, зачисление денег, кредитование, финансовые рынки, акции и т.д. Всякий раз, когда появляется какая-то новая система, мы начинаем следующий уровень увлекательной игры под названием «Интеграция». И каждый следующий уровень сложнее предыдущего — ведь систем нужно охватывать все больше и больше. Этот пост — то, что в геймерских кругах именуется walkthrough: сначала мы пробежимся по локальным сетям и затем через очереди сообщений перейдем к масштабному этапу потоковых вычислений посредством Apache Kafka в широко распределенных сетях.  


Сначала немного теории — перечислим, что мы оцениваем в интеграции с учетом банковской специфики:

  • Производительность. Все просто: чем выше, тем лучше.
  • Латентность — задержки передачи информации. Допустимая латентность зависит от того, с какими системами мы имеем дело. Если вы придете в банкомат снять деньги с карты, то лишняя секунда погоды не сделает. А если ждать заставят 30 секунд, то вряд ли вам это понравится. Бывают операции, в которых латентность не так важна. Если вы подаете заявку на кредит, то вполне можете подождать решения десять минут — и тут 30 секунд не принципиальны. Но в принципе, чем ниже, тем лучше.
  • Масштабирование. Масштабирование бывает двух видов. При вертикальном масштабировании вы добавляете мощностей на одной машине, и у вас  увеличивается производительность. При горизонтальном — ставите рядом с машиной еще n-ное количество таких же.
  • Отказоустойчивость. Вот это для нас очень важно. Если в банке что-то отказывает и клиенты не обслуживаются — это очень плохо для всех. Сюда можно приписать еще один важный показатель — время восстановления.
  • Консистентность. Предположим, зачисление денег прошло, а списание нет. А вам нужно подбивать баланс. Второй пример: вы отправляете перевод, и у вас деньги с карточки списались, а тому человеку, которому вы их перечисляете, они не зачислились. Это значит, что система находится в неконсистентном состоянии. И вызывает массу неудобств. Очень желательно, чтобы все данные находилась в консистентном состоянии.

Начало истории

Первым был этап локальных сетей — формирования классической двухзвенной архитектуры и рассвет серверов баз данных (MS SQL, Oracle и других). В Сбербанке был большой, мощный сервер базы данных, который обслуживал всю организацию. Клиентские машины по локальной сети подключались к нему, получали и записывали информацию.

Количество пользователей бизнес-приложений стало очень быстро расти. Затем началось активное распространение интернет-технологий. База данных выполняла роль хранилища. Мы уперлись в возможности сервера базы данных и перешли к трехзвенной схеме. Тонкий клиент — браузер — подключался к серверу приложений и взаимодействовал с конечным пользователем. В сервере приложений находилась бизнес-логика — правила манипуляции информацией.

У этой архитектуры есть много преимуществ:

  • Не нужно ставить на машины клиентское ПО и обновлять программы — достаточно обновить сервер приложений и базу данных, и сразу всем клиентам становилась доступна новая версия.
  • База данных реляционно хранит все данные организации —  за счет проверки ключей и поддержки транзакций мы автоматически получаем консистентность.
  • JEE-сервера приложений хорошо кластеризуются, масштабируются и берут на себя большую часть работы по бизнес-логике.
  • Интерфейсы веб-приложений на основе JavaScript по насыщенности и возможностям приближаются к нативным.

Лет 10-15 назад это было очень круто и сильно облегчило жизнь.

Клиент запрашивал информацию с сервера приложений и блокировался — ждал, пока не получит ответ. Следующим камнем преткновения стало синхронное взаимодействие составляющих системы. Здесь производительность серьезно проседала. И серверные компоненты также ожидали ответов друг от друга.

После записи в такую очередь вызывающий компонент не ждет ответа и может выполнять другую полезную работу. Для решения проблем с синхронным обменом используется промежуточное ПО для создания очередей сообщений. Которые вызывающий компонент слушает в отдельном потоке. Обрабатывающий компонент читает поступившие в очередь сообщения и формирует ответы.

Достоинства такой архитектуры:

  • Асинхронный обмен значительно повышает производительность систем.
  • Если ваш сервер на некоторое время остановился, клиент об этом не узнает. Он будет просто закидывать запросы на обработку, пока очередь доступна. В это время можно быстро и незаметно поднять серверную часть, вычитать из очереди и обработать то, что поступило в течение последних нескольких минут. В итоге клиент практически ничего не заметит.
  • Если централизовать очереди в брокере сообщений, на предприятии можно получить единую точку управления информационными потоками.

Таким образом мы построили нашу внутреннюю архитектуру обработки информации лет семь назад. Все стало замечательно.

Встречаем Kafka

Два года назад мы решили переходить с  платных продуктов крупных вендоров на более функциональный open source. Посмотрели, что можно сделать с очередями сообщений, решили оставить архитектуру интеграции без изменений, а на open source перевести очереди сообщений. Просканировали рынок и наткнулись на Apache Kafka — распределенный программный брокер сообщений с открытым кодом, написанный на Scala и Java (в банке это наш основной технологический стек). Тогда была актуальна Kafka версий 0.8–0.9.

Существующие очереди на том же оборудовании вытягивали в лучшем случае 5-7 тысяч. Быстро развернули пилот: производительность Kafka оказалась как минимум в несколько раз выше нашего решения, десятки тысяч сообщений в секунду, а то и больше, около ста.

Топология получилась сложная: возникали шлюзы, которые распределяли нагрузку, обеспечивали работу кластера Message Queue и т.д. В нашем предыдущем Message Queue (MQ) построение кластера требовало много нетривиальных действий. Если нужно отключить машину, то достаточно только остановить Kafka на узле — брокер сам выйдет из кластера. С Kafka все оказалось проще: ставим новую машину, поднимаем на ней Kafka, прописываем номер узла Kafka в кластере, и узел подключается к кластеру сам. Масштабирование при этом получаем близкое к линейному: поставите второй брокер — будет обрабатывать в два раза больше, если третий — то в три. Таким образом легко можно масштабироваться в обе стороны. На пилоте ставили десять узлов, и эта зависимость сохранялась.

Kafka поддерживает сразу два унифицированных стиля взаимодействия.

  • Point-to-point — кто-то выкладывает обработчику информацию, обработчик ее забирает, и друг с другом взаимодействуют только эти две стороны.  
  • Publish-subscribe — когда кто-то выкладывает информацию, и ее читает сразу много потребителей.

В старых парадигмах JMS это были два разных интерфейса, а в Kafka все унифицировано, и разницы между Point-to-point и Publish-subscribe нет, в том числе и в API для программиста. Это большое упрощение.

В очередях это работает иначе. В Kafka все сообщения являются персистентными, то есть записываются на диск. Как только мы включаем режим персистивности — запись сообщений на диск — производительность падает в несколько раз. Когда все сообщения в очереди хранятся в оперативной памяти, все работает достаточно быстро — несколько тысяч сообщений в секунду. А у нас немало данных, которые не хочется терять — например, данные о переводе денег. Однако без этого режима не обойтись, ведь информация в оперативной памяти стирается, как только выключается машина. В Kafka сообщения персистентные «из коробки», и при этом все работает быстро.

Переезжаем с JMS на Kafka

Быстрее, удобней, да еще и бесплатно. Ура, бросаем JMS, переезжаем на Kafka! Раньше у нас были обычные очереди, а теперь Kafka-топики. Суть та же: записали в топик и забыли, а кто-то с другой стороны асинхронно читает.

Как это все устроено внутри? Кафка, по сути дела, это append-only distributed log, то есть лог, запись в который всегда идет в конец. Для обеспечения масштабирования топик разделен на партиции. В каждой партиции всегда есть start offset (номер первого записанного сообщения) и end offset (номер последнего записанного сообщения). Запись всегда происходит в конец лога, и номера сообщений непрерывно монотонно увеличиваются. Последовательная запись на диск осуществляется с хорошей скоростью и обеспечивает персистентность — в отличие от записи в произвольный участок файла, особенно медленной на HDD.

Экземпляру читателя при создании присваивается группа, в соответствии с которой читатель  подписываются на топики — наборы партиций (логов). Что происходит на стороне читателя?

Если что-то было записано, Kafka эти данные отдает. Читатель бесконечно осуществляет вызов (poll), то есть запрашивает у Кафки данные. Так в топике можно сделать много партиций и посадить много читателей. Читатель их обрабатывает, в ответ сообщает commit, и указатель перемещается вперед на одно сообщение. Если мы хотим, чтобы все работало быстрее, мы увеличиваем количество партиций в топике и читателей в группе, и за счет распараллеливания все работает быстрее. Одна партиция в одной группе читается одним читателем, и это достаточно простая и понятная схема масштабирования. В Kafka писатели называются продьюсерами, а читатели консьюмерами.

Проблемы с пачками сообщений

Опытный стенд на Kafka радовал всем, кроме кое-какого неприятного момента: одни и те же сообщения начали дублироваться. Сразу подумали, что дело в том, кто записывает. Но нет, запись шла один раз, а чтение — два, три, порой даже четыре раза. В результате в Кафке падала производительность и появлялось большое количество дублей.

Оказывается, группы читателей работают немного по-своему. В рамках одного цикла опроса обработчик получает сразу пачку сообщений. Дальше он начинает ее обрабатывать и должен коммитить обработку сообщений. В нашем случае в обработке были не только одиночные сообщения — некоторые собирали в себе пачку информации, много бизнес-событий. Обработчик получает такую пачку, например, из 200 событий и последовательно начинает обрабатывать каждое из них.

Если за один poll приходило две-три большие пачки сообщений, время тайм-аута часто истекало и консьюмер выкидывало. Тем временем брокер начинает отсчет тайм-аута — некоторого интервала времени, после которого, не получив коммит обработки сообщения, он начинает считать обработчика мертвым, выбрасывает его из группы и ставит взамен одного из живых. Вместо якобы мертвого Kafka подставляла соседнего, якобы живого консьюмера. На Кафке начинался ребаланс — перестроение группы консьюмеров, когда присодединяли нового или, как в нашем случае, выбрасывали старого. Через некоторое время в ней, по мнению сервера, вообще не оставалось живых консьюмеров, и чтение прекращалось. Пачки сообщений начинали по кругу убивать всю группу.

Первый вариант: давайте не будем передавать пачками. Что делать? Может, создадим отдельный пул сообщений? Но система, которая их передавала, иначе работать не могла, потому что была не очень онлайновой. Но тогда мы разорвем чтение и обработку сообщений, и схема станет хрупкой.

В этот момент как нельзя кстати вышла десятая версия Kafka с несколькими очень полезными для нас фичами:

  • KIP-62 – heartbeat в отдельном потоке. Раньше подтверждение того, что обработчик живой и сама обработка шли в одном потоке. В десятой Kafka ввели отдельное сообщение «я живой», которое можно кидать не в основном а в отдельном фоновом потоке. И идут эти сообщения гораздо быстрее, чем обработка основной пачки. Ребаланса не происходит, и мы можем довольно долго прокручивать большие сообщения.
  • KIP-41 – максимальное число сообщений в одном poll. Раньше оно было ограничено только доступной памятью. Если кто-то много записал, то обработчик мог сразу могли брать 10, 30, 50 сообщений. С десятой Кафки можно установить точное число: вычитываемых ообщений за один poll.
  • Выставление настроек таймаутов.

С новыми настройками система стала работать стабильно, и массовые дубли прекратились. Но все равно не до конца. В этот момент мы поняли: Kafka — это не совсем очередь. Это другая структура данных, партиционированный лог.

В очереди все читатели конкурентно читают сообщения, и поэтому она не всегда упорядочена. В чем разница? В очереди сообщения после прочтения удаляются, а в Кафке нет — просто двигается указатель прочитанных сообщений. В Кафке в рамках партиций чтение идет последовательно, и партиция всегда упорядочена. Сообщения удаляются пачками, как файлы из файловой системы — получается гораздо менее затратно, чем с очередями, которые удаляют каждое сообщение. Через некоторое время (тайм-аут) в Кафке удаляется весь файл, и начинается запись в новый файл (сегмент). Происходит ребаланс, и в течение некоторого времени брокер не отдает читателям никаких сообщений, пока ребаланс не произойдет. И, как мы описали выше, включение/отключение одного из консьюмеров в режиме чтения влияет на остальных.

Добиваем дубли

Урок выучили, систему стабилизировали, дубли свелись к долям процента, но полностью от них мы не избавились. Дубли возникали из-за ребалансов, избежать которых не получалось — как минимум, они происходили при вводе нового консьюмера в топологию, или если Кафка посчитает, что нужно провести какую-нибудь оптимизацию.

Было три варианта: Мы стали думать, что делать с оставшимися дублями.

  • Ничего не делать. Бывают такие задачи, где дубли совсем не страшны. Например, вы мониторите какой-то бизнес-процесс на предмет прохождения определенной стадии. Если вы два раза получили информацию об этом, ничего страшного. Или, например, если клиент запрашивает баланс по своей карте и два раза его получает.
  • Бороться с последствиями. Тут существует два основных подхода: дедупликация и компенсационная логика.  
  • Устранить причину — сделать такую систему, где дубли не будут возникать. Это самый правильный, но самый сложный путь.

Если ничего не делать, то тут ключевым понятием является идемпотентность. Идемпотентность — это если операция повторяется несколько раз, и это никак не влияет на системы с точки зрения стороннего наблюдателя, с точки зрения состояния данных.

Есть способы сделать бизнес-операции идемподентными, даже если они таковыми не являются. Идемподентными могут быть операции чтения или мониторинга. После первой обработки ID мы поменяли баланс на счете и сохранили информацию об этом. Например, при переводе денег мы генерируем уникальный ID — вводим уникальный идентификатор операции. Однако для этого надо каждый раз перерабатывать логику, делать анализ задач — в общем, способ затратный. В дальнейшем по ID мы определяем, что перевод уже сделан, и движение денег не производим.

Можно поставить общее хранилище, и когда сообщение приходит во второй раз, его игнорировать. Второй подход – создание дедупликатора. А поскольку обычно хранилище удаленное, мы получаем дополнительно один сетевой вызов и рост латентности. Но здесь необходимо завести дополнительное хранилище для трафика вызовов — на больших объемах оно может стать точкой отказа и причиной падения производительности. На небольших нагрузках дедупликатор — вполне рабочая схема, но это не наш случай.

Наши прикладные программисты должны будут все время помнить, что операция может повториться. Третий подход — сделать компенсацию на уровне бизнес-логики. Это трудоемко и может вызывать множество ошибок, поэтому компенсационная логика — крайний вариант. А как определить, повторилась ли она из-за интеграции или пользователь действительно каждую секунду пытается перевести кому-то пять рублей?

Тогда повторные операции будут отклоняться, потому что будет повтор транзакции. Была мысль о том, чтобы добавлять к операциям транзакции. Однако в Кафке она не поддерживается и вряд ли будет поддерживаться. В Java есть даже технология распределенной транзакции (XA-транзакция).

Как и ожидалось, остается бороться с причиной.

Переносим commit до обработки

Когда идет ребалансировка группы, то закоммитить сообщение старым консьюмером уже нельзя. Возникает ошибка о том, что этот консьюмер больше не работает с этой партицией. Мы всегда делали commit после обработки, но можно перенести коммит до обработки. Тогда консьюмер прочитает сообщение из Кафки и сразу подтвердит прочтение.

В этом случае мы потеряем это сообщение, ведь Кафка считает, что уже нам его отдавала, а обработчик еще не отработал до конца. А что если в момент, когда мы уже закоммитили, но еще не обработали, произойдет сбой в обработчике? Ее можно использовать для некоторых, не очень важных операций. Такая гарантия обработки сообщения называется at most once, то есть «максимум один раз». Но не для операций, связанных с деньгами,-  ведь потерять перевод никому не хочется.

Назначаем обработчикам топики

Можно не использовать механизм автобалансирующихся групп читателей, а явно назначать каждой партиции обработчик с помощью вызова метода assign. Это когда мы явно говорим: ты обработчик, вот твой топик, вот твоя партиция, работай с ней. В этом случае можно делать ранний коммит — at most once, а можно, для гарантии, поздний — at least once. За счет того, что только один обработчик производит коммиты и обработку, если сильно постараться, можно сделать и exactly once — то есть точно один раз.

Вы прибили гвоздем обработчик к партиции. Но чем плох assign? Для администратора системы это достаточно трудозатратно: нужно следить, чтобы обработчики были живы, вручную перезапускать их и так далее. Теперь, если он умер, с ним надо что-то делать: перезапускать, смотреть, что он последнее обработал, и так далее. И если в процессе появляется человек, вы можете забыть про быстрое время восстановления системы, поскольку у него сразу появляется желание разобраться, что было обработано, а что – нет. То есть мы начинаем делать работу консьюмер-группы. Мы получаем exactly once, но сильно теряем в отказоустойчивости. Люди в лучшем случае реагируют за минуты, компьютеры — за доли секунды. И должны будем очень много тратить на эксплуатацию.

Переоценка распределенных сетей

В итоге окончательное покорение Кафки мы тогда отложили. Вернулись к вопросу через год-полтора. Нас устраивала производительность, масштабируемость, отказоустойчивость. Вот только с консистентностью беда — проклятые дубли. Вряд ли опытные разработчики Кафки могли проигнорировать такую проблему. Возможно, мы неправильно ей пользовались? Нет. Решение скрывалось на еще более глубоком уровне, чем можно было предположить.

Этим принципам посвящен труд Л. Как оказалось, в большой распределенной среде просто не работают некоторые принципы, на которых держалось раньше наше проектирование IT-систем. Питера Дойча «Fallacies of distributed computing», написанный в 1994-1997 гг.

  1. Сеть больше не так надежна, как раньше. Из-за большого числа элементов она не может все время работать быстро и безотказно.
  2. Задержки передачи информации больше не равны нулю, как в локальной сети. Да, скорость доступа между памятью самая высокая. Если же мы связываемся с дисками несколько десятков раз, производительность таким же образом падает. А если еще и с сетью связываемся, замедление бывает и в сто раз. Нельзя пренебрегать задержкой взаимодействия между распределенными компонентами.
  3. Пропускная способность конечна. При больших объемах сети мы быстро упираемся в потолок, особенно при взаимодействии с удаленными серверами.
  4. Сеть больше не безопасна. При работе через интернет невозможно контролировать все, могу где-нибудь что-нибудь взломать.
  5. Топология все время меняется. Какие-то машины все время включаются или выключаются. Среди тысяч серверов Google порядка десятка всегда в неработоспособном состоянии.
  6. Администратор уже не один. Их могут быть сотни, каждый по-своему управляет своей частью системы.

Приняв эти истины, мы сформулировали три основные характеристики распределенной системы:

Сбои – это норма 1.

Неработоспособность системы – это отклонение от каких-то характеристик или полный выход из строя. Если на одной машине сбой был экстраординарным событием, то когда у вас много машин, постоянно где-то что-то не работает. Сбой – это самоустраняющийся отказ. Нарушение работоспособности – это отказ. Чем больше система, тем отказы в ней разнообразнее и чаще. И нам нужно делать такие системы, чтобы мы переживали сбои. Потому что бегать по всей большой системе и все руками чинить – с ума сойти можно. Нужно добиваться, чтобы отказы превращались в сбои, чтобы отказы самоустранялись.

Координация сложна 2.

Координация сложна. Чем больше машин, тем сложнее обеспечить координацию, тем более через сеть. Чем меньше различные части системы координируются между собой, тем лучше. Задержка по сети между узлами, ненадежность связи, изменчивая топология — бороться с этим нельзя, нужно просто стараться этого избегать. Если они справляются независимо друг от друга, это идеальный вариант.

Время неоднородно. 3.

И на разных компьютерах тоже разное время. В разных частях системы за счет задержек разное время. Например, если вы делаете чат, то важно не конкретное время, а порядок. Очень часто при проектировании путают три вещи: время, длительность и порядок сообщений. Бывает что важна длительность, а не порядок. Если вам отправили вопрос, а вы на него ответили, важно, чтобы все первым увидели вопрос, а вторым – ответ. Например, если вы замеряете тайм-аут.

Но хуже всего то, что время плавает. Даже самые точные атомные часы тоже плавают. Обычные кварцевые часы на локальной машине могут сбиваться на пару миллисекунд из-за разогрева машины и других физических причин. Синхронизация времени между машинами дает десятки миллисекунд. С этим нужно смириться и понимать, что в разных частях вашей системы время разное.

До этого момента у нас было два основных варианта: С учетом новых условий пришлось переоценить вопрос обработки информации.

  • Batch. Вы накапливаете определенный массив информации и запускаете «молотилку», которая производит расчеты в оффлайне и выдает результат. Расчет может длиться минуты, часы, дни, но мы можем спокойно переработать большие объемы. Если что-то сломалось или мы поняли, что у нас ошибка в алгоритме, входной массив информации не меняется — это хорошо. Мы можем устранить ошибки, снова запустить и получить ответ, который нас устраивает. Это происходит не в онлайне, и результат всегда детерминирован. Если входной массив и алгоритм не менялись, вы, конечно, получите те же результаты.
  • Request-Reply. Этот онлайн-вариант используется, когда нужно получить результат быстро, как в веб-браузере, например. Вы даете какой-то базе данных запросы и быстро получаете ответ. Но поскольку эти вызовы никак не упорядочены, воспроизвести это вы больше не можете. В следующую секунду состояние базы может измениться, и, кинув тот же запрос, вы получите совершенно другой результат. То есть, результат недетерминирован, но его можно получить быстро.

В каждом случае приходится идти на жертвы. А можно ли и точно, и быстро? Для ряда случаев мы нашли способ.

Потоковая архитектура

Итак, мы живем в распределенной системе со своими особенностями. Kafka имеет ограничения, связанные с дублями. А централизованную реляционную базу нельзя раздувать вечно — есть ограничения в масштабировании. Что делать? Попробуем реализовать часть задач в потоковой архитектуре. Наше знакомство с ней началось со статьи «Introducing Kafka Streams: Stream Processing Made Simple» Джея Крепса, нынешнего CEO Confluent, разработчика Кафки.

Поток – это упорядоченный по времени набор неизменяемых сообщений. Потоковая архитектура базируется на понятии потока. Если событие записано в журнал, его уже нельзя поменять. Все, что происходит в нашей системе, последовательно пишется в журнал по мере того, как события происходят во времени. Можно только выложить новое событие «корректировка пользователя» или что-то подобное. Если вы создали пользователя, то нельзя вернуться назад и отменить действие. Когда что-то случается, мы уже не можем вернуться назад и изменить то, что произошло. В общем, это сильно похоже на нашу жизнь. Мы можем только отреагировать на произошедшее и создать новое событие.

Соответственно, если модуль хочет взаимодействовать с новыми модулями, он отправляет событие в их журналы событий. Через потоки событий асинхронно обмениваются сообщениями модули бизнес-логики. Таким образом, вся система – это модули и взаимосвязь между ними в журналах событий.

Cостояние модуля – это результат обработки определенного потока. Поток событий бесконечен, они строго упорядочены по времени, и этот порядок после записи никогда не меняется. Делая это каждый раз, мы будем получать одно и то же конечное состояние, поскольку начальное состояние фиксировано, поток событий тот же, алгоритм обработки тот же. Если мы приведем модуль в начальное состояние X и проиграем определенный поток, мы получим состояние Y.

С помощью партиций. Как такую систему масштабировать?

События распределяются по ним в соответствии с ключами, которые мы событиям присваиваем. В примере выше создано три партиции. События внутри одной партиции упорядочены по времени. К1–К3 в первой партиции, К4–К6 во второй и К7–К9 в третьей. Обработал одно — перешел к следующему. К каждой партиции привязан обработчик, который последовательно обрабатывает события. Состояние обработчика определяется начальным состоянием и потоком. То есть он управляет своей локальной базой данных. Общая скорость работы системы зависит от количества и скорости обработчиков.

Мы просто бросаем в поток события. В этой схеме нет центральной базы и координации, потому что каждый обработчик занимается своей партицией и ничего не знает про остальных. Если такие события попадут в разные партиции, то из-за независимости обработчиков по времени у нас получится разбежка и результаты лягут не в те обработчики и все будет плохо. Разные события могут относиться к изменению одной сущности и быть логически связаны.

Как это сделать? Все логически связанные события должны оказаться в одной партиции, чтобы далее отправиться в один и тот же экземпляр разработчика. Есть два подхода.

Этот метод используется в Kafka Streams. Первый подход — партиционирование потока при записи. Записав этот ключ, мы можем спокойно работать дальше и быть уверены, что логически связанные события попадут в один обработчик. Мы должны из записываемой информации сгенерировать некоторый ключ, по которому определяется партиция. Недостаток способа: если меняется топология топика, приходится осуществлять репартиционирование — перераспределение данных по партициям, что весьма ресурсоемко.

Его можно реализовать с помощью оператора, доступного в Apache Flink и похожих движках. Второй подход — партиционирование потока при чтении. Но потом весь топик читается одним кластером, который при чтении каждого события вычисляет его ключ. В нем логически связанные события могут попадать в разные партиции. Недостаток способа: дополнительное сетевое взаимодействие. Вычислив ключ, он понимает, где в топологии кластера находится нужный обработчик, и отправляет событие туда. Если мы читаем на одном узле кластера, а обработчик находится на другом узле, то отправлять приходится через сеть.

Между этими способами нельзя сделать однозначный выбор, нужно в каждом конкретном случае учитывать ресурсы и требования.

Отказоустойчивость обработчиков и хранилища

Бывает, что обработка каждого события не зависит от того, какие события приходили раньше и в каком состоянии находится обработчик. В таких задачах обработчику не нужна даже локальная база данных для хранения его состояния. Такой класс задач называется stateless processing.
Чаще всего у нас возникают другие задачи — аналитика, агрегация — которые зависят от тех событий, что были раньше. В этом случае возникает необходимость в хранилище данных. Его можно сделать в виде key-value. Тогда обработчик при обработке каждого события будет помещать необходимые данные агрегации, расчетов и другую историю в один из ключей свего key-value хранилища.

Можно периодически сбрасывать его данные в какое-нибудь персистентное хранилище — сохранять снэпшоты. Как обеспечить отказоустойчивость такого хранилища? С помощью серии таких снэпшотов можно понять, когда в каком состоянии находилась система. В тот же снэпшот мы можем положить и последний offset партиций, который был удачно обработан. Такая техника очень полезна в двух случаях: Если произойдет сбой в обработке потоков, можно будет просто развернуть подходящий снэпшот и offset, запустить реплей потока, заново обработать кусок лога и снова идти вперед.

  • Исправление ошибок в обработчике. Допустим, в какой-то момент времени алгоритмы обработчика допустили ошибку логики, и дальше все события начали обрабатываться неправильно. Тогда мы находим последний снэпшот до ошибки, откатываем систему, ставим обработчик с хорошей, проверенной логикой, проводим реплей потока — вуаля, ошибка нивелирована.
  • Аварийная остановка обработчика. При этом база данных может оставаться в консистентом состоянии. Это поможет обеспечить корректность результатов при перезапуске.

Проблемы могут возникнуть не только с обработчиками. Хорошо бы обеспечить и отказоустойчивость самого хранилища. Для этого есть два подхода:

  1. Асинхронная репликация хранилища на другие узлы. Этот подход использует Kafka Streams. Подход основан на том, что любое key-value хранилище можно превратить в поток и наоборот. Для этого необходимо, чтобы ключи и их порядок в хранилище были независимы. Ключом события является ключ в хранилище, значением — то, что находится в хранилище по ключу. Если на основном узле возникли проблемы, можно обратиться к узлу репликации, поднять там обработчик, он переедет на другой узел, и все будет надежно.
  2. Сохранить состояние хранилища в доступном всему кластеру ресурсе — например, в Hadoop или другом распределенном хранилище. Схема работы примерно такая же, только обработчик читает файловую систему по команде из кластера. Затем загружает данные локально, и все снова работает.

Мы научились делать надежные обработчики и хранилища. Kafka обеспечивает надежное хранение потока. Осталось понять, что делать с консистентностью.

Что делать с консистентностью?

Известно, что каждая партиция потока обрабатывается независимо, своим обработчиком. Какие-то обработчики чуть быстрее, какие-то отстают.


Красными флажками обозначены последние сообщения, которые прочитали обработчики: K2, K4 и K7.

Но если не прерывать работу обработчиков, а только остановить запись новых сообщений, то рано или поздно все обработчики справятся со всеми событиями своих потоков, и базы данных придут в консистентное состояние. Если остановить время и посмотреть на базы данных обработчиков в какой-то произвольный момент, они будут не совсем консистентны. Это хуже, чем в реляционных базах данных, где снимок БД будет консистентен в любой момент. Такая гарантия консистентности называется eventual consistency. Но гораздо лучше, чем полное отсутствие консистентности.

Насколько это критично, нужно смотреть по конкретным задачам. Ослабление консистентности — это то, от чего в нашей архитектуре не убежать. Но это сделает архитектуру очень хрупкой. Если разбежки недопустимы, то нужно искать другой вариант, либо модифицировать эту архитектуру: искусственно замедлять более быстрые обработчики. Те, кому удавалось подобное, расплачивались пропускной способностью и латентностью.

Люди, работавшие с реляционными базами данных, могут предложить сделать распределенные транзакции. Как нам усилить консистентность? Как только транзакция коммитится, изменения становятся видны сразу пачкой. В разных партициях берутся разные события, но их обработку мы включаем в одну транзакцию. Если транзакция отменена, изменения видны не будут.

Но проблема в том, что в сильно распределенной системе распределенные транзакции работают очень медленно и плохо. Идея хорошая. В итоге в больших распределенных системах со сложной логикой распределенные транзакции не работают или работают очень медленно. Возникает много точек синхронизации, блокировок, с ростом числа участников транзакции ситуация становится хуже и хуже.

Агрегация событий

До сих пор мы говорили об обработке единичных событий. Но есть такой класс задач — алгоритмы аналитики — где результатом обработки потока являются агрегаты событий. Например, в каждом событии платежа передается сумма. Мы, проанализировав эти суммы, хотим знать, сколько всего клиент заплатил, сколько перевел на другой счет за какое-то время. Либо хотим получить какие-то средние значения, минимальные, максимальные… вариантов много.

Мы не знаем общее число событий в потоке — оно постоянно увеличивается. Как это реализовать? Соответственно, чтобы отвечать на такие вопросы в нашей архитектуре, нужно разбить поток на окна агрегации. Но мы можем взять события за определенный промежуток времени, зафиксированные в журнале и посчитать.

Рубим весь поток, например, на часовые окна и внутри каждого можем проводить агрегацию. Самый простой вид окно агрегации — tumbling window — когда эти окна без разрывов следуют друг за другом. Вот у вас открылось окно. Как можно это сделать? Обработчик открывает в базе данных специальную область для этого окна, и в ней начинается подсчет того, что происходит — суммируются события с одинаковым ключом.

Когда окно закроется, в верхнем синем квадрате у нас будет два события с ключом K1. В верхнем окне у нас два события с ключом K1. Затем откроется следующее окно, где могут быть уже другие события с другим количеством. При закрытии окна эту информацию мы можем скинуть в поток (отреплицировать наше хранилище), и поток сможет прочитать тот, кому нужна эта аналитика. В любой момент времени есть открытое окно, в котором хранятся поступающие события.

Таким образом мы превращаем поток, где много событий, в другой агрегированный поток, где у нас идет разделение по ключам и событиям соответствует гораздо меньше записей.

Время машины и время потока

Вроде бы с агрегацией разобрались, но упустили одну важную вещь. Если мы открываем окна по текущему времени, то может получиться, что подсчет агрегации будет вестись по событиям, которые на самом деле происходили довольно давно. Например, мы накопили поток, остановили все на час-два и хотим получить аналитику, запустив и обработав этот поток через два часа. Окна начнут открываться по текущему времени, хотя события были сгенерированы гораздо раньше. Получится, что в текущем окне по текущему времени произошли какие-то события, а в окнах, которые были старше, этих событий нет. Итоговая аналитика в этой ситуации будет неверной: окажется, что в более раннее время ничего не было, и все произошло прямо во время обработки.

В события нужно обязательно закладывать время, когда они случились, когда были записаны в поток. Чтобы избежать этого, нужно понять, что окна и время в потоке — это совсем не текущее время на нашей машине. И тогда окна нужно открывать не на основании текущего времени, а синхронизировать их с временем прохождения событий.

Один из популярных способов — вставлять в поток метки времени — вотермарки (watermarks). Для корректной работы нужно понимать, как течет время в потоке.

Это решит нашу проблему. Если вотермарки совпадают с окнами агрегации, то мы можем ориентироваться, когда закрывать старое и открывать новое окно. В этом случае нужно вставлять их чаще и учитывать время событий, чтобы понимать, куда поток продвинулся. Бывают ситуации, что вотермарки не выровнены — например, мы не знаем шаг агрегации и т.д.

Снэпшоты должны быть привязаны не к физическому времени, а к времени потока. Вотермарки также полезны при создании time-based снэпшотов хранилищ. Как только сообщения быстро обрабатываются, время ускоряется, и наоборот. Реальное время течет равномерно, время в потоке — нет.

Есть несколько вариантов: Теперь надо решить: кто и когда должен генерировать вотермарки.

Неплохой вариант. Кто записывает данные в поток, тот и генерирует. Или можно сделать компонент, который при получении событий сам себе генерирует вотермарки. Помимо системы-источника, вотермарки может вставлять сама система лога. Все это называется семантикой времени потока.

Но что если мы записали вотермарк и потом тут же забросили в поток события, которые произошли раньше этого вотермарка? Мы знаем, что после вотермарка нельзя записать событие, которое наступило до него. По вотермарку закроется окно агрегации, и тут приходит событие, которое просто отскакивает от закрытого окна. Это может иметь неприятные последствия. Представьте возможные последствия, когда речь идет о финансах.

Можно при закрытии окна дать какое-то время на поступление поздних событий. Как решить эту проблему? Но результат сгенерируется не сразу, а спустя время, которое отведено на прибытие поздних событий. В итоге вы получите более корректную агрегацию. Но в любом случае мы идем на компромисс между точностью и скоростью агрегации. Можно отправлять уточненную аналитику с учетом опоздавших событий дополнительным сообщением в поток агрегации.

Processing Time — текущее время на локальной машине. Вообще в нашем случае различают не два, а даже три вида времени. Ingestion Time – время поступления события в лог (поток при записи подставляет его). Event Time — время, когда событие было сгенерировано источником.

Но пришлось согласиться на eventual consistency. Итак, мы научились делать надежные приложения в ненадежных распределенных системах — обрабатывать потоки с нормальным масштабированием и отказоустойчивостью. Как здорово было бы проворачивать такие трюки со временем в реальной жизни. Можно в любой момент откатить поток в более раннее состояние и заново его прогнать, избегая ошибок.

Доступ к данным. Разгадка тайны дублей

Потоковая архитектура — это лишь одна из возможных в нашей IT-инфраструктуре. С потоками взаимодействуют другие системы, которые построены совсем на других принципах. Оценивая потоковую архитектуру со стороны, можно заметить несколько проблем:

  1. Потоковая система может обращаться наружу по принципу ad-hoc — запрашивать баланс пользователя и т.д. В момент реплея — когда в потоковой системе произошел откат — она не контролирует внешнее окружение. Один и тот же запрос баланса пользователя до реплея и после реплея могут дать разные результаты. Это повлияет на агрегацию и корректность алгоритмов работы потоковой системы. При каждом реплее результаты могут меняться, в зависимости от того, как изменились внешние данные.
  2. Внешняя система обращается к потоковой за какими-то данными. Внешняя система может получить данные от потоковой системы по запросу или через push. А после реплея — получить второй push, возможно, с другими результатами. Из-за этого мы и получали в Кафке дубли — она была предназначена для потоковых архитектур, а мы с ней работали в классической.

Подумаем, как организовать доступ к данным потоковой архитектуры — данным в базах данных обработчиков событий. Есть несколько подходов.

Queryable state — запрос снаружи о состоянии потокового кластера. 1. Знакомый всем веерный query.

Ему поступает запрос, SQL или какой-нибудь еще. Схема стандартна, за исключением одного компонента — это Query Controller. Каждый обработчик смотрит, что он может вернуть, и дает ответ в query controller. Контроллер дублирует запрос во все существующие обработчики. Эта схема реализует один полезный паттерн — CQRS, Change Query Request Segregation, то разделение каналов модификации и чтения сущностей. А он агрегирует результаты и возвращает ответ тому, кто сделал запрос. Изменения идут через потоки, а чтение — совсем по другому каналу, через Query Controller.

При запросе веером вы получите ответ не раньше чем ответит самый медленный обработчик, и по нему будет определяться латентность. Не смешивать сущности полезно, но есть несколько нюансов. Нужно либо усиливать его, либо разбивать на несколько более быстрых. Если не усиливать самый медленный обработчик, система не может стать быстрее. Так что для конкретного роста мощностей, скажем, в два раза, придется проводить анализ состояния всего кластера, которое может зависеть от множества факторов. После этого все будет измеряться по второму самому медленному разработчику. Таков недостаток схемы queryable state.

Copy by request 2.

Если такие сведения есть, то можно закидывать запросы на выборку данных не сразу все обработчикам, а адресно — только тем, где нужные данные имеются. В queryable state мы не делали никаких предположений о том, в каких именно обработчиках могут храниться данные. То есть мы копируем все состояние обработчика или его часть по запросу. И сразу получать ответ. А можно подмешивать такие сообщения в поток изменений — тогда у нас получается всего один канал общения с потоковой системой. Направлять такие запросы и получать ответ можно как через queryable controller — здесь агрегация сводится к случаю, когда мы ждем один ответ. Для разных запросов используют разные способы.

Но нам нужно знать принцип разделения потока и понимать, как запрос отражается на локальных состояниях обработчиков. Теперь у нас нет проблемы с поиском самого медленного обработчика — мы четко знаем, где лежат данные и что надо улучшать, если экземпляр медленно отработал. Если это возможно — лучше использовать copy by request вместо веерного queryable state.

Continuous distribution 3.

К обработчикам приходят данные из партиций, они обрабатывают их и выкидывают результаты обработки в другой поток. Более нативный для потоковых систем способ — не что иное как потоковая репликация онлайн-данных. Получается, что в обработчиках у нас есть мастер-копия данных — key-value — которая выкидывается в поток. Остальные сервисы, которые интересуются этими данными, могут подписаться на этот поток, потреблять его и делать локальные копии данных у себя. Подобным образом в поточной архитектуре обеспечивается надежность, но здесь мы еще и бизнес-логику строим. Этот поток потребляется сервисами, которые создают у себя витрины.

Недостаток — вместе с мастер-копией во всем кластере может существовать много других копий данных, что потребует много ресурсов. Преимущества метода — надежность, отказоустойчивость за счет откатов при сбоях. Но зато когда какой-нибудь сервис захочет сделать что-то сложное, и в его копии окажутся необходимые данные другого сервиса, это позволит работать в локальной парадигме, без дополнительных транзакций и синхронизаций.

Как правило, потоковые архитектуры хороши именно там, где минусы continuous distribution не играют особой роли. Если требования к задержкам не слишком высоки и есть достаточно ресурсов для хранения копий данных, в потоковых системах рекомендуется использовать именно этот способ.

К чему мы пришли?

Подведем итоги приключения. Прямо по пунктам:

  • Kafka может заменить JMS в определенных условиях.
  • Kafka и потоковая обработка хороши для больших распределенных систем.
  • Eventual consistency — это неизбежная плата за использование потоковой обработки.
  • Все наши пайплайны с Кафкой и потоковой обработкой работают при некритичных требованиях к латентности, но зато обеспечивают масштабируемую производительную обработку

В новой парадигме мы уже запустили некоторые задачи, например, генерацию оповещений пользователей и мониторинг онлайн-платежей. Все работает замечательно. Сама по себе потоковая архитектура возникла недавно, и появится еще много вариантов реализации ее механизмов. Мы продолжаем ее изучать и, если будет интересно, еще расскажем о своих новых приключениях.

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть