[Перевод] Знакомство с реактивными потоками – для Java-разработчиков
Привет, Хабр!
Речь пойдет об Akka Streams и потоковой передаче данных в целом — в книге Роланда Куна этим вопросам посвящены главы 10 и 15-17.Реактивные потоки – это стандартный способ асинхронной обработки данных в потоковом стиле. Сегодня мы вернемся к одной из тем, затрагиваемых в нашей замечательной книге "Реактивные шаблоны проектирования". Следует отметить, что реактивные потоки – «просто» стандарт, а сами по себе ни на что не годятся. Они были включены в Java 9 как интерфейсы java.util.concurrent.Flow, а сейчас становятся настоящей палочкой-выручалочкой для создания потоковых компонентов в различных приложениях — и такая расстановка сохранится на протяжении ближайших лет. На практике используется та или иная конкретная реализация этого стандарта, и сегодня мы поговорим об Akka Streams – одной из ведущих реализаций реактивных потоков с момента их зарождения.
Итак, если взять два смежных шага и считать вышестоящий поставщиком, а следующий за ним – потребителем данных, то окажется, что поставщик может работать либо медленнее потребителя, либо быстрее его. Типичный конвейер потоковой обработки состоит из нескольких шагов, на каждом из которых информация передается на следующий шаг (то есть, по нисходящей). В таком случае потребитель может переполниться данными, которые ему приходится (в меру сил) аккуратно обрабатывать. Когда поставщик работает медленнее – все нормально, но ситуация осложняется, если потребитель не поспевает за поставщиком.
Именно так и поступают, например, при работе с сетевым оборудованием. Простейший способ справиться с избытком данных – взять и отбросить все, что не удается обработать. Тогда нам пригодится обратное давление (backpressure) Но что делать, если мы вообще ничего не хотим отбрасывать?
Поскольку важнейший аспект реактивного подхода – не допускать блокировок за исключением случаев, когда это совершенно необходимо, реализация обратного давления в реактивном потоке также должна быть неблокирующей. Идея обратного давления очень важна в контексте Reactive Streams и сводится к тому, что мы ограничиваем объем данных, передаваемых между соседними звеньями конвейера, поэтому ни одно звено не переполняется.
Это значит, что, просто добавив зависимость к org.reactivestreams:reactive-streams, мы просто топчемся на месте – нам все равно нужна конкретная реализация. Стандарт Reactive Streams определяет ряд интерфейсов, но не реализацию как таковую. Среди других реализаций можно упомянуть RxJava 2.x или Reactor и др. Существует множество реализаций Reactive Streams, и в этой статье мы воспользуемся Akka Streams и соответствующим DSL на основе Java.
Кроме того, мы хотим задать некий порог накопления агрегированных данных, по достижении которого будет инициироваться уведомление по электронной почте. Допустим, у нас есть каталог, в котором мы хотим отслеживать новые CSV-файлы, затем каждый файл обрабатывать по потоковому принципу, на лету выполнять кое-какую агрегацию, а собранные таким образом результаты отправлять на веб-сокет (в реальном времени).
Более того, мы хотим отправлять по электронной почте уведомление после каждого пятого значения, поступающего на веб-сокет. Мы хотим рассчитать среднее значение value для двух строк с общим id и отправлять его на веб-сокет лишь в том случае, если оно превышает 0.9. Наконец, мы хотим считывать и отображать данные, полученные с веб-сокета, и это будет делаться через тривиальный фронтенд, написанный на JavaScript.
рис. Мы собираемся использовать ряд инструментов из экосистемы Akka (см. Естественно, в центре всей системы будет находиться Akka Streams, которая позволяет обрабатывать данные в реальном времени по потоковому принципу. 1). Интересно, что, поскольку Akka Streams – это реактивные потоки, вся экосистема Alpakka также доступна и любой другой реализации RS – именно такой выигрыш в интероперабельности призваны достичь is RS-интерфейсы. Для считывания CSV-файлов мы воспользуемся Alpakka, это набор соединителей для интеграции Akka Streams с различными технологиями, протоколами или библиотеками. Самое приятное в данном случае — Akka HTTP бесшовно интегрируется с Akka Streams (которую, фактически, и использует «под капотом»), поэтому предоставить поток в качестве веб-сокета не составляет труда. Наконец, мы воспользуемся Akka HTTP, по которому предоставим конечную точку веб-сокета.
1. Рис. Обзор архитектуры
Никаких контейнеров и бинов, а лишь простое автономное приложение. Если сравнить эту схему с классической архитектурой Java EE, то, вероятно, заметно, что здесь все устроено гораздо проще. Более того, стек Java EE вообще не поддерживает потоковый подход.
Никаких вычислений там не производится. На базе этих компонентов определяем наш граф, который, в сущности – просто рецепт для обработки данных. Для этого вам понадобится так называемый материализатор, оптимизирующий определение графа и, в конечном итоге, запускающий его. Чтобы конвейер заработал, нам нужно материализовать граф, то есть, привести его в запускаемую форму. Он относится к так называемому «материализованному значению» — это значение, доступное извне графа (в противоположность типам ввода/вывода, доступным только для внутренней коммуникации между шагами графа – см. Однако, встроенный ActorMaterializer фактически безальтернативен, поэтому вы вряд ли будете пользоваться какой-либо иной реализацией.Если внимательно присмотреться к параметрам типов компонентов, то заметно, что каждый компонент (кроме соответствующих типов ввода/вывода) имеет таинственный тип Mat. 2). рис. Его можно сравнить с Void из Java, однако, семантически он чуть нагруженнее: в смысловом отношении «мы не используем этого значения» информативнее Void. Если вы предпочитаете игнорировать материализованное значение (а такое часто случается, если нас интересует всего лишь передача данных между шагами графа), то для обозначения такого варианта есть специальный параметр типа: NotUsed. Пожалуй, другие библиотеки Java в обоих этих случаях использовали бы Void, но в Akka Streams все типы стараются по максимуму наполнить полезной семантикой. Также отмечу, что в некоторых API используется схожий тип Done, сигнализирующий, что та или иная задача завершена.
2. Рис. Описание параметров типа Flow
Для начала определим граф Akka Streams, а потом по протоколу Akka HTTP соединим поток с веб-сокетом. Теперь давайте перейдем к конкретной реализации обработчика CSV.
Хотелось бы использовать для этого java.nio.file.WatchService, но, поскольку у нас потоковое приложение, нужно получить источник событий (Source) и с ним и работать, а не организовывать все через обратные вызовы. На входной точке нашего потокового конвейера мы хотим отслеживать, появились ли в интересующем нас каталоге новые CSV-файлы. К счастью, такой Source уже доступен в Alpakka в форме одного из соединителей DirectoryChangesSource, входит в состав alpakka-file, где «под капотом» используется WatchService:
private final Source<Pair<Path, DirectoryChange>, NotUsed> newFiles = DirectoryChangesSource.create(DATA_DIR, DATA_DIR_POLL_INTERVAL, 128);
Так получаем источник, выдающий нам элементы типа Pair<Path, DirectoryChange>. Мы собираемся отфильтровывать их так, чтобы подбирать лишь новые CSV-файлы, а затем передавать их «вниз». Для такого преобразования данных, а также для всех последующих мы будем пользоваться маленькими элементами, именуемыми Flow, из которых затем сложится полноценный конвейер обработки:
private final Flow<Pair<Path, DirectoryChange>, Path, NotUsed> csvPaths = Flow.<Pair<Path, DirectoryChange>>create() .filter(this::isCsvFileCreationEvent) .map(Pair::first); private boolean isCsvFileCreationEvent(Pair<Path, DirectoryChange> p) { return p.first().toString().endsWith(".csv") && p.second().equals(DirectoryChange.Creation);
}
Можно создать Flow, к примеру, при помощи обобщенного метода create()— он полезен, когда сам входной тип обобщенный. Здесь результирующий поток будет порождать (в виде Path) каждый новый CSV-файл, появляющийся в DATA_DIR.
Чтобы превратить источник в другой источник, можно воспользоваться одним из методов flatMap*. Теперь мы собираемся преобразовать Path-ы в строки, потоком выбираемые из каждого файла. В данном случае мы остановимся на flatMapConcat, поскольку хотим сохранить порядок строк, так, чтобы строки с одинаковыми id остались рядом друг с другом. В обоих случаях мы создаем Source из каждого входящего элемента и каким-либо образом комбинируем несколько получившихся источников в новый, цельный, сцепляя или сливая исходные источники. Чтобы преобразовать Path в поток байт, воспользуемся встроенной утилитой FileIO:
private final Flow<Path, ByteString, NotUsed> fileBytes = Flow.of(Path.class).flatMapConcat(FileIO::fromPath);
На этот раз воспользуемся методом of() для создания нового потока – он удобен, когда входной тип не является обобщенным.
В данном случае мы хотим разобрать поток байт как CSV-файл – и для этого вновь воспользуемся одним из модулей Alpakka, на этот раз alpakka-csv: Показанный выше ByteString – это представление последовательности байт, принятое в Akka Streams.
private final Flow<ByteString, Collection<ByteString>, NotUsed> csvFields = Flow.of(ByteString.class).via(CsvParsing.lineScanner());
Обратите внимание на используемый здесь комбинатор via, позволяющий прикрепить произвольный Flow к выводу, полученному на другом шаге графа (Source или другой Flow). В результате получается поток элементов, каждый из которых соответствует полю в отдельно взятой строке CSV-файла. Затем их можно преобразовать в модель нашей предметной области:
class Reading
}
Для преобразования как такового используем метод map и передаем ссылку на метод Reading.create:
private final Flow<Collection<ByteString>, Reading, NotUsed> readings = Flow.<Collection<ByteString>>create().map(Reading::create);
На следующем этапе мы должны сложить показания в пары, вычислить для каждой группы среднее значение value и передавать информацию далее лишь при достижении определенного порога. Поскольку нам требуется, чтобы среднее вычислялось асинхронно, мы воспользуемся методом mapAsyncUnordered, выполняющим асинхронную операцию с заданным уровнем параллелизма:
private final Flow<Reading, Double, NotUsed> averageReadings = Flow.of(Reading.class) .grouped(2) .mapAsyncUnordered(10, readings -> CompletableFuture.supplyAsync(() -> readings.stream() .map(Reading::getValue) .collect(averagingDouble(v -> v))) ) .filter(v -> v > AVERAGE_THRESHOLD);
Определив вышеуказанные компоненты, мы готовы сложить из них цельный конвейер (при помощи уже знакомого вам комбинатора via). Это совершенно не сложно:
private final Source<Double, NotUsed> liveReadings = newFiles .via(csvPaths) .via(fileBytes) .via(csvFields) .via(readings) .via(averageReadings);
Примечание
Предоставлять источник показаний как веб-сокет,
Выдавать тривиальную веб-страницу, которая подключается к веб-сокету и отображает полученные данные.
Ничего не стоит создать веб-сервер при помощи Akka HTTP: нужно просто унаследовать HttpApp и предоставить требуемые отображения по DSL маршрута:
class Server extends HttpApp { private final Source<Double, NotUsed> readings; Server(Source<Double, NotUsed> readings) { this.readings = readings; } @Override protected Route routes() { return route( path("data", () -> { Source<Message, NotUsed> messages = readings.map(String::valueOf).map(TextMessage::create); return handleWebSocketMessages(Flow.fromSinkAndSourceCoupled(Sink.ignore(), messages)); } ), get(() -> pathSingleSlash(() -> getFromResource("index.html") ) ) ); }
}
Здесь определяется два маршрута: /data, то есть, конечная точка веб-сокета, и / по которому выдается тривиальный фронтенд. Уже понятно, насколько просто предоставить Source из Akka Streams в качестве конечной точки веб-сокета: берем handleWebSocketMessages, задача которого – усовершенствовать HTTP-соединение до соединения с веб-сокетом и организовать там поток, в котором будут обрабатываться входящие и исходящие данные.
В данном случае мы хотим игнорировать входящие данные и создаем такой поток, «входящая» сторона которого заведена в Sink.ignore(). WebSocket моделируется в виде потока, то есть, на клиент посылаются исходящие и входящие сообщения. Все, что приходится сделать с числами double, в виде которых представлены средние – преобразовать каждое из них в TextMessage, это применяемая в Akka HTTP обертка для данных веб-сокета. «Исходящая» сторона потока обработчика веб-сокета просто связана с нашим источником, из которого поступают средние значения. Все элементарно делается при помощи уже знакомого нам метода map.
Server server = new Server(csvProcessor.liveReadings);
server.startServer(config.getString("server.host"), config.getInt("server.port"));
Фронтенд
В этом коде используется синтаксис ES6, который должен нормально выполняться в любом современном браузере. Чтобы получать данные с веб-сокета и отображать их, воспользуемся совершенно простым кодом на JavaScript, который просто прикрепляет полученные значения к textarea.
let ws = new WebSocket("ws://localhost:8080/data");
ws.onopen = () => log("WS connection opened");
ws.onclose = event => log("WS connection closed with code: " + event.code);
ws.onmessage = event => log("WS received: " + event.data);
Метод log прикрепляет сообщение к textarea, а также ставит метку времени.
запустить сервер (sbt run),
перейти в браузере на localhost:8080 (или к выбранным вами хосту/порту, если вы изменили умолчания),
скопировать один или несколько файлов из src/main/resources/sample-data в каталог data в корне проекта (если вы не меняли csv-processor.data-dir в конфигурации),
смотреть, как данные выводятся в логах сервера и в браузере.
Добавляем почтовый триггер
Он должен работать «сбоку», чтобы не нарушать передачу основных элементов. Последний штрих в нашем приложении – побочный канал, в котором мы будем имитировать почтовые оповещения, отсылаемые после поступления на веб-сокет каждого пятого элемента.
Первая просто подает значения на веб-сокет, а вторая контролирует, когда истекут очередные 5 секунд, и отправляет уведомление по электронной почте – см. Чтобы реализовать такое поведение, воспользуемся более продвинутой возможностью Akka Streams — языком Graph DSL – на котором напишем наш собственный шаг графа, на котором поток разветвляется на две части. 3. рис.
3. Рис. Наш собственный шаг графа для отправки сообщений по электронной почте
Также напишем наш собственный уловитель — Mailer: Мы будем использовать встроенный шаг Broadcast, на котором наш ввод высылается на набор объявленных выводов.
private final Graph<FlowShape<Double, Double>, NotUsed> notifier = GraphDSL.create(builder -> { Sink<Double, NotUsed> mailerSink = Flow.of(Double.class) .grouped(EMAIL_THRESHOLD) .to(Sink.foreach(ds -> logger.info("Sending e-mail") )); UniformFanOutShape<Double, Double> broadcast = builder.add(Broadcast.create(2)); SinkShape<Double> mailer = builder.add(mailerSink); builder.from(broadcast.out(1)).toInlet(mailer.in()); return FlowShape.of(broadcast.in(), broadcast.out(0));
});
Начинаем создавать наш собственный шаг графа с метода GraphDSL.create(), где предоставляется экземпляр построителя графа, Builder – он используется для манипуляций со структурой графа.
Для каждой такой группы сымитируем побочный эффект: уведомление, поступающее по электронной почте. Далее определим наш собственный уловитель, где применяется grouped для объединения входящих элементов в группы произвольного размера (по умолчанию 5), после чего эти группы отправляются вниз.
Также добавляем шаг Broadcast с двумя выходами. Определив наш собственный уловитель, можем использовать экземпляр builder, чтобы добавить его к графу.
Ввод написанного нами шага будет напрямую соединен с выходом шага Broadcast. Далее нужно указать соединение между элементами графа – один из выходов шага Broadcast мы хотим соединить с уловителем электронной почты, а другой – сделать выходом для написанного нами шага графа.
Однако, этот момент проверяется материализатором во время выполнения, поэтому висящих элементов на входе или на выходе не будет. Примечание 1Компилятор не может определить, правильно ли соединены все части графа.
Здесь имеем дело с формой Flow, то есть, у нас один вход и один выход. Примечание 2В данном случае можно заметить, что все написанные нами шаги имеют вид Graph, где S – форма, определяющая число и типы входов и выходов, а M – материализованное значение (если таковое имеется).
private final Source<Double, NotUsed> liveReadings = newFiles .via(csvPaths) .via(fileBytes) .via(csvFields) .via(readings) .via(averageReadings) .via(notifier);
Запустив обновленный код, вы увидите, как в логе появляются записи о почтовых уведомлениях. Уведомление отправляется всякий раз, когда через веб-сокет успевает пройти еще пять значений.
Это альтернатива традиционному подходу, применяемому на Java EE. В этой статье мы изучили общие концепции потоковой обработки данных, узнали, как при помощи Akka Streams построить легковесный конвейер обработки данных.
Также было показано, как использовать Alpakka для потоковой подачи данных из файловой системы и протокол Akka HTTP, позволяющий создать простой веб-сервер с веб-сокетом на конечной точке, бесшовно интегрируемый с Akka Streams. Мы рассмотрели, как использовать некоторые шаги обработки, встроенные в Akka Streams, как написать собственный шаг на языке Graph DSL.
В нем есть несколько дополнительных log-шагов, расставленных в разных точках. Полноценный рабочий пример с кодом из этой статьи находится на GitHub. В статье я специально их опустил, чтобы она получилась покороче. Они помогают точнее представить, что происходит внутри конвейера.