Хабрахабр

Reactor, WebFlux, Kotlin Coroutines, или Асинхронность на простом примере

Их задачи сводятся к запросам к другим базам/сервисам/кешам и агрегации всех этих данных по различным правилам и разнообразной бизнес-логике. Многие сервисы в современном мире, по большей части, «ничего не делают». Поэтому неудивительно, что появляются такие языки, как Golang, с удобной встроенной конкурентной системой, позволяющей легко организовывать неблокирующий код.

Есть огромное количество фреймворков и библиотек, блокирующих потоки при использовании. В JVM-мире всё немного сложнее. Да и в Java нет аналогичного механизма, похожего на горутины в Golang. Так и сама stdlib может делать то же самое порой.

Есть Kotlin с корутинами, которые по своему использованию очень похожи на горутины из Golang (хоть и реализованы совершенно по-другому). Тем не менее, JVM активно развивается и появляются новые интересные возможности. Один из самых популярных веб-фреймворков — Spring — не так давно добавил возможность создавать полностью неблокирующие сервисы на Webflux. Есть JEP Loom, который в будущем привнесёт fibers в JVM. 2 интеграция с Kotlin стала ещё лучше. А с недавним релизом Spring boot 2.

2 и Kotlin для интеграции с несколькими внешними сервисами. Предлагаю на примере небольшого сервиса по переводу денег с одной карты на другую самим написать приложение на Spring boot 2.

Но если нет — не беда. Хорошо, если вы уже знакомы с Java, Kotlin, Gradle, Spring, Spring boot 2, Reactor, Webflux, Tomcat, Netty, Kotlin Сoroutines, Gradle Kotlin DSL или даже имеете степень доктора наук. Код будет максимально упрощён, и даже если вы не из мира JVM, надеюсь, вам будет всё понятно.

Если вы планируете сами написать сервис, убедитесь, что всё необходимое установлено:

  • Java 8+;
  • Docker и Docker Compose;
  • cURL и желательно jq;
  • Git;
  • желательно IDE для Kotlin (Intellij Idea, Eclipse, VS, vim и т.п.). Но можно и в блокноте.

Сначала запустим установку и сборку и подробнее рассмотрим сервисы и их API. Примеры будут содержать как заготовки под реализацию в сервисе, так и уже написанную реализацию.

Сам пример сервисов и API сделан лишь для наглядности, не стоит переносить к себе в прод всё AS IS!

Сперва клонируем к себе репозиторий с сервисами, интеграцию с которыми будем делать, и переходим в директорию:

git clone https://github.com/evgzakharov/spring-demo-services && cd spring-demo-services

В отдельном терминале собираем все приложения с помощью gradle, где после успешной сборки все сервисы запустятся с помощью docker-compose.

./gradlew build && docker-compose up

Пока всё скачивается и устанавливается, рассмотрим проект с сервисами.

На вход сервиса (Demo service) будет поступать запрос с токеном, номерами карт для перевода и суммой, которую будем переводить между картами:

{ "authToken": "auth-token1", "cardFrom": "55593478", "cardTo": "55592020", "amount": "10.1"
}

AUTH также будет возвращать нам информацию о том, к каким из трёх сервисов мы можем получить доступ. По токену authToken необходимо сходить в сервис AUTH и получить userId, с которым потом можно сделать запрос к USER и вытянуть всю дополнительную информацию по пользователю. Пример ответа от AUTH:

{ "userId": 158, "cardAccess": true, "paymentAccess": true, "userAccess": true
}

В ответ на запросы мы получим cardId, дальше с ними отправляем запрос в PAYMENT и делаем перевод. Для перевода между картами сначала идём с каждым номером карты в CARD. И последнее — ещё раз отправляем запрос в PAYMENT с fromCardId и узнаём текущий баланс.

И чтобы разнообразить ответы от AUTH, есть возможность варьировать значение SUCCESS_RATE, которое управляет вероятностью ответа true для сервиса. Чтобы эмулировать небольшую задержку в сервисах, во всех контейнерах пробрасывается значение переменной окружения TIMEOUT, в которой в миллисекундах задаётся задержка на ответ.

Файл docker-compose.yaml:

version: '3'
services: service-auth: build: service-auth image: service-auth:1.0.0 environment: - SUCCESS_RATE=1.0 - TIMEOUT=100 ports: - "8081:8080" service-card: build: service-card image: service-card:1.0.0 environment: - TIMEOUT=100 ports: - "8082:8080" service-payment: build: service-payment image: service-payment:1.0.0 environment: - TIMEOUT=100 ports: - "8083:8080" service-user: build: service-user image: service-user:1.0.0 environment: - TIMEOUT=100 ports: - "8084:8080"

Для всех сервисов делается проброс портов с 8081 по 8084, чтобы легко достучаться до них напрямую.

Сперва попробуем написать реализацию максимально «топорно», без асинхронности и параллелизма. Перейдём к написанию Demo service. 2. Для этого возьмём Spring boot 2. Клонируем репозиторий и переходим в ветку spring-mvc-start: 1, Kotlin и заготовку для сервиса.

git clone https://github.com/evgzakharov/demo-service && cd demo-service && git checkout spring-mvc-start

Controller. Переходим в файл demo. В нём есть единственный пустой метод processRequest, реализацию для которого необходимо написать.

@PostMapping fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response

На вход в метод будет поступать запрос на перевод между картами.

data class ServiceRequest( val authToken: String, val cardFrom: String, val cardTo: String, val amount: BigDecimal
)

Для тех, кто не близко знаком с Spring

DemoController помечен специальной аннотацией RestController: она, помимо регистрации бина в DI, добавляет также его обработку как контроллера. В Spring есть встроенный DI, который работает на основе аннотаций. PostProcessor находит все методы, помеченные аннотацией PostMapping, и добавляет их в качестве endpoint у сервиса с методом POST.

В нашем случае это всего один аргумент, помеченный аннотацией @RequestBody. Обработчик также создаёт proxy-класс для DemoController, в котором в метод processRequest передаются все необходимые аргументы. Поэтому в proxy данный метод будет вызываться с содержимым JSON, десериализованым в класс ServiceRequest.

Методов всего пять, по одному под каждое действие. Чтобы было проще, все методы по интеграции с другими сервисами уже сделаны, нужно только их правильно соединить. Сами вызовы других сервисов реализованы на блокирующем вызове RestTemplate из Spring.

Пример метода для вызова AUTH:

private fun getAuthInfo(token: String): AuthInfo { log.info("getAuthInfo") return restTemplate.getForEntity("${demoConfig.auth}/{token}", AuthInfo::class.java, token) .body ?: throw RuntimeException("couldn't find user by token='$token'")
}

В комментариях отмечен порядок действий и какой ответ ожидается на выходе: Перейдём к реализации метода.

@PostMapping fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { //1) get auth info from service by token -> userId //2) find user info by userId from 1. //3) 4) find cards info for each card in serviceRequest // 5) make transaction for known cards by calling sendMoney(id1, id2, amount) // 6) after payment get payment info by fromCardId TODO("return SuccessResponse")
// SuccessResponse(
// amount = ,
// userName = ,
// userSurname = ,
// userAge =
// ) }

Попробуйте это сделать самостоятельно. Сначала реализуем метод максимально просто, без учёта, что AUTH может нам запретить доступ к другим сервисам. Когда получится (или после перехода в ветку spring-mvc), вы можете проверить работу сервиса следующим образом:

реализация из ветки spring-mvc

fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { val authInfo = getAuthInfo(serviceRequest.authToken) val userInfo = findUser(authInfo.userId) val cardFromInfo = findCardInfo(serviceRequest.cardFrom) val cardToInfo = findCardInfo(serviceRequest.cardTo) sendMoney(cardFromInfo.cardId, cardToInfo.cardId, serviceRequest.amount) val paymentInfo = getPaymentInfo(cardFromInfo.cardId) return SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age )
}

Запускаем сервис (из папки demo-service):

./gradlew bootRun

Отправляем запрос на endpoint:

./demo-request.sh

В ответ получаем что-то подобное:

➜ demo-service git:(spring-mvc) ✗ ./demo-request.sh
+ curl -XPOST http://localhost:8080/ -d @demo-payment-request.json -H 'Content-Type: application/json; charset=UTF-8'
+ jq . % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed
100 182 0 85 100 97 20 23 0:00:04 0:00:04 --:--:-- 23
{ "amount": 989.9, "userName": "Vasia", "userSurname": "Pupkin", "userAge": 18, "status": true
}

И учитывая, что каждый из них отвечает с задержкой в 100 мс, общее время не может быть меньше 600 мс. В общей сумме нужно сделать 6 запросов, чтобы реализовать работу сервиса. Пока код совсем простой, и если мы сейчас захотим добавить проверку ответа AUTH для доступа к другим сервисам, то это будет несложно сделать (как и любой другой рефакторинг). В реальности получается примерно 700 мс с учётом всех накладных расходов.

Если не учитывать проверки ответа от AUTH, то у нас есть 2 независимые задачи: Но давайте подумаем, как можно ускорить выполнение запросов.

  • получение userId и запрос данных из USER;
  • получение cardId для каждой из карт, проведение платежа и получение итоговой суммы.

Тогда суммарное время выполнения будет зависеть от наиболее длинной цепочки вызовов (в данном случае второй) и будет суммарно выполняться за время 300 мс + X мс на накладные расходы. Эти задачи могут выполняться независимо друг от друга.

Можно под каждый вызов создать отдельный Thread, но это будет очень накладно. Учитывая, что сами вызовы у нас блокирующие, то единственный способ выполнить параллельные запросы — запускать их на отдельных потоках. С первого взгляда такое решение выглядит подходящим, и время действительно уменьшится. Другой способ — запуск задач на ThreadPool. Он позволяет запускать фоновые задачи, вызывая методы с постфиксом async. Например, мы можем выполнять запросы на CompletableFuture. Попробуйте сами написать реализацию или перейдите в ветку spring-mvc-async. И если при вызове методов не указывать конкретный ThreadPool, задачи будут запускаться на ForkJoinPool.commonPool().

Реализация из ветки spring-mvc-async

fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { val authInfoFuture = CompletableFuture.supplyAsync { getAuthInfo(serviceRequest.authToken) } val userInfoFuture = authInfoFuture.thenApplyAsync { findUser(it.userId) } val cardFromInfo = CompletableFuture.supplyAsync { findCardInfo(serviceRequest.cardFrom) } val cardToInfo = CompletableFuture.supplyAsync { findCardInfo(serviceRequest.cardTo) } val waitAll = CompletableFuture.allOf(cardFromInfo, cardToInfo) val paymentInfoFuture = waitAll .thenApplyAsync { sendMoney(cardFromInfo.get().cardId, cardToInfo.get().cardId, serviceRequest.amount) } .thenApplyAsync { getPaymentInfo(cardFromInfo.get().cardId) } val paymentInfo = paymentInfoFuture.get() val userInfo = userInfoFuture.get() log.info("result") return SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age )
}

По сравнению с первоначальным вариантом, суммарное время уменьшилось почти в 2 раза. Если сейчас измерить время запроса, оно будет в районе 360 мс. И если мы тут захотим добавить проверку ответа от AUTH, то и это сделать несложно. Сам код немного усложнился, но пока его всё так же несложно видоизменять.

Скажем, около 1000 одновременных запросов? Но что если у нас большое количество входящих запросов на сам сервис? И мы приходим к тому, что текущий вариант тоже не устраивает. При таком подходе довольно быстро получится, что все потоки ThreadPool заняты выполнением блокирующих вызовов.

Можно изменить запросы и сделать их неблокирующими. Остаётся только что-то сделать с самими вызовами сервисов. При таком подходе нам не нужно делать вызовы на отдельных потоках — достаточно будет одного (или по крайней мере маленького отдельного пула потоков), который мы уже заняли под обработку запросов. Тогда методы по вызову сервисов будут возвращать CompletableFuture, Flux, Observable, Deferred, Promise или аналогичный объект, на котором можно построить цепочку ожиданий.

Чтобы ответить на этот вопрос, внимательно посмотрим на Tomcat, который используется в Spring boot 2. Сможем ли мы теперь выдерживать на сервисе большую нагрузку? 1 в стартере org.springframework.boot:spring-boot-starter-web. 2. И при отсутствии свободных потоков новые запросы будут становиться «в очередь» ожидания. Он построен так, что под каждый входящий запрос выделяется поток из ThreadPool на его обработку. Выделять под это целый поток и блокировать его, пока не придут ответы от всех, выглядит, мягко говоря, излишним. Но сам наш сервис только лишь рассылает запросы в другие сервисы.

Для этого потребуется только сменить стартер spring-boot-starter-web на spring-boot-starter-webflux и немного изменить метод для обработки запросов, в котором запрос и ответ будут «обёрнуты» в Mono. К счастью, недавно в Spring появилась возможность использовать неблокирующий веб-сервер на базе Netty или Undertow. Это связано с тем, что Webflux построен на основе Reactor, и поэтому теперь в методе нужно построить цепочку из преобразований Mono.

Для этого перейдите в ветку spring-webflux-start. Попробуйте написать самостоятельно неблокирующую реализацию метода. В содержимое метода processRequest в комментарии вставлена реализация из первого примера. Обратите внимание, что изменился стартер для Spring Boot, где теперь используется версия с Webflux, и также изменилась реализация запросов к другим сервисам, которые переписаны на использование неблокирующего WebClient. Как и в прошлый раз, вначале сделайте версию без учёта проверок от AUTH, а потом посмотрите, насколько сложно их добавить: Попробуйте её самостоятельно переписать на Reactor.

fun processRequest(@RequestBody serviceRequest: Mono<ServiceRequest>): Mono<Response> {
// val authInfo = getAuthInfo(serviceRequest.authToken)
//
// val userInfo = findUser(authInfo.userId)
//
// val cardFromInfo = findCardInfo(serviceRequest.cardFrom)
// val cardToInfo = findCardInfo(serviceRequest.cardTo)
//
// sendMoney(cardFromInfo.cardId, cardToInfo.cardId, serviceRequest.amount)
//
// val paymentInfo = getPaymentInfo(cardFromInfo.cardId)
//
// log.info("result")
//
// return SuccessResponse(
// amount = paymentInfo.currentAmount,
// userName = userInfo.name,
// userSurname = userInfo.surname,
// userAge = userInfo.age
// ) TODO() }

После того как справились с этим, можете сравнить с моей реализацией из ветки spring-webflux:

Реализация из ветки spring-webflux

fun processRequest(@RequestBody serviceRequest: Mono<ServiceRequest>): Mono<Response> { val cacheRequest = serviceRequest.cache() .publishOn(Schedulers.parallel()) val userInfoMono = cacheRequest.flatMap { getAuthInfo(it.authToken) }.flatMap { findUser(it.userId) } val cardFromInfoMono = cacheRequest.flatMap { findCardInfo(it.cardFrom) } val cardToInfoMono = cacheRequest.flatMap { findCardInfo(it.cardTo) } val paymentInfoMono = cardFromInfoMono.zipWith(cardToInfoMono) .flatMap { (cardFromInfo, cardToInfo) -> cacheRequest.flatMap { request -> sendMoney(cardFromInfo.cardId, cardToInfo.cardId, request.amount).map { cardFromInfo } } }.flatMap { getPaymentInfo(it.cardId) } return userInfoMono.zipWith(paymentInfoMono) .map { (userInfo, paymentInfo) -> log.info("result") SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age ) }
}

И если мы захотим добавить «забытые» проверки от AUTH, то это будет не так просто сделать. Согласитесь, что теперь написать реализацию (по сравнению с предыдущим блокирующим подходом) стало сложней.

Он прекрасно подходит для построения неразветвлённых цепочек обработок. В этом вся суть реактивного подхода. Но если появляется ветвление, то код становится уже не таким простым.

К тому же существует большое количество написанных обёрток для Reactor, CompletableFuture и т.п. Помочь тут могут корутины из Kotlin, которые прекрасно дружат с любым асинхронным/реактивным кодом. Но даже если вы не найдёте нужную, её всегда можно написать самостоятельно, используя специальные билдеры.

Для этого перейдём в ветку spring-webflux-coroutines-start. Давайте самостоятельно перепишем реализацию на корутины. В ней в build.gradle.kts добавляются нужные зависимости:

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$kotlinCoroutinesVersion")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion")

И немного меняется метод processRequest:

suspend fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response = coroutineScope { //TODO() }

Учитывая, что в методе мы будем создавать дополнительные корутины, нам потребуется создать дочерний скоуп coroutineScope (для понимания причин создания дополнительного скоупа посмотрите пост Романа Елизарова о Structured concurrency). Он больше не нуждается в Mono и преобразуется просто в suspend-функцию (спасибо интеграции Spring и Kotlin). Они возвращают всё тот же Mono, на котором можно вызвать suspend-метод awaitFirst, чтобы «дождаться» результата выполнения запроса. Обратите внимание, что другие вызовы сервисов совсем не изменились.

Попробуйте самостоятельно написать реализацию метода processRequest или перейдите в ветку spring-webflux-coroutines: Если корутины для вас ещё новая концепция, то есть замечательный гайд c подробным описанием.

реализация из ветки spring-webflux-coroutines

suspend fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response = coroutineScope { log.info("start") val userInfoDeferred = async { val authInfo = getAuthInfo(serviceRequest.authToken).awaitFirst() findUser(authInfo.userId).awaitFirst() } val paymentInfoDeferred = async { val cardFromInfoDeferred = async { findCardInfo(serviceRequest.cardFrom).awaitFirst() } val cardToInfoDeferred = async { findCardInfo(serviceRequest.cardTo).awaitFirst() } val cardFromInfo = cardFromInfoDeferred.await() sendMoney(cardFromInfo.cardId, cardToInfoDeferred.await().cardId, serviceRequest.amount).awaitFirst() getPaymentInfo(cardFromInfo.cardId).awaitFirst() } val userInfo = userInfoDeferred.await() val paymentInfo = paymentInfoDeferred.await() log.info("result") SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age )
}

С корутинами не придётся заранее продумывать все точки ветвлений. Можно сравнить код с реактивным подходом. Код остаётся максимально похожим на первоначальный прямолинейный вариант, который совсем не сложно изменять. Мы можем просто в нужных местах вызывать методы await и «ответвлять» выполнение асинхронных задач в async. И немаловажным фактором является то, что корутины просто встраиваются в реактивный код.

В целом оба подхода решают свою задачу и можно использовать тот, что по душе. Возможно, даже для этой задачи реактивный подход вам нравится больше, но многие из опрошенных людей находят его более сложным. Правда, они ещё находятся в экспериментальной стадии, но уже сейчас можно посмотреть на текущую реализацию и попробовать у себя в коде. Кстати, с недавних пор в Kotlin появилась ещё и возможность создавать «холодные» корутины с Flow, которые во многом похожи на Reactor.

На этом хочу закончить и напоследок оставить полезные ссылки:

И, конечно, хочется верить, что вариант с корутинами вам нравится больше =) Надеюсь, вам было интересно и у вас получилось самостоятельно написать реализацию метода для всех способов.

Спасибо всем, кто дочитал до конца!

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть