Главная » Хабрахабр » Java и Project Reactor. Эпизод 2

Java и Project Reactor. Эпизод 2

Удивительно, но первая часть статьи даже кому-то понравилась.
Отдельное спасибо за ваши отзывы и комментарии. Привет! А если точнее, то о некоторых деталях работы Reactor. У меня для вас плохая хорошая новость: нам ещё есть о чём поговорить!

Я отрекаюсь от магии

Что же тщательно скрывается от нас за внешним слоем из Flux и Mono? Для дальнейшего углубления в Reactor не будет лишним описать некоторые принципы его работы.

Напоминание

Flux и Mono реализуют интерфейс Publisher.

public void subscribe(Subscriber<? super T> s);

Publisher выдаёт какие-то данные (материалы). Официальная документация предлагает сравнивать Reactor с конвейером. Данные идут по цепочке из операторов (конвейерной ленте), обрабатываются, в конце получается готовый продукт, который передаётся в нужный Consumer/Subscriber и употребляется уже там.

Рецепт усреднённый, потому что вариаций масса. Как работают операторы Reactor? Попытаемся дать грубое описание.

Вызов оператора у Flux/Mono возвращает объект, реализующий этот оператор. У каждого оператора есть какая-то тактика реализация в виде объекта. Например, вызов flatMap() вернёт объект типа FluxFlatMap (наследник Flux).

оператор — это Publisher, который, помимо какой-то своей логики, содержит ссылку на исходный (source) Publisher, к которому применяется. Т.е. Вызовы операторов создают цепочку из Publisher.

При вызове subscribe() создаётся исходный Subscriber, он передаётся назад по нашей цепочке из паблишеров, каждый Publisher может завернуть Subscriber в другой Subscriber, таким образом создавая ещё одну цепочку, которая и передаётся в исходный Publisher для выполнения.

Логично, что всё это несёт какой-то оверхед, поэтому рекомендуется воздержаться от написания обычного (синхронного) кода через Flux или Mono.

Schedulers | Планировщики

Разработчик самурай волен самостоятельно выбирать свою судьбу модель исполнения. Reactor не заботит модель исполнения вашей программы, но он любезно предоставляет необходимый для управления исполнением инструментарий.

планировщика). Модель исполнения и её детали определяются имплементацией интерфейса Scheduler (т.е. Есть статические методы для ряда случаев жизни, позволяющие указать контекст выполнения:

  • Schedulers.immediate(). Выполнение будет происходить в текущем потоке;
  • Schedulers.single(). Выполнение в выделенном потоке. Осторожно! Он и в самом деле single, обращение не создаст новый scheduler/поток, а вернёт кешированное значение. Для создания выделенного потока/scheduler на каждый вызов используйте Schedulers.newSingle();
  • Schedulers.elastic(). Уже упоминался в прошлой статье. Выполнение задач списывает на workers (работяг, «воркеров»), которых сам же и создаёт. В случае idle (бездействия) worker прибивается. В качестве воркера выступает ExecutorService. Используется для блокирующих задач, например I/O. По умолчанию — unbounded, если нужно ограничение на количество воркеров — используйте Schedulers.newElastic();
  • Schedulers.parallel(). N воркеров, оптимизированных для параллельной работы. По умолчанию N = количеству доступных ядер, т.е. Runtime.getRuntime().availableProcessors(). Осторожно! Внутри Docker этот метод может нагло вам врать.

Такое нововведение появилось в релизе 3. Стоит отметить, что коробочные Schedulers.single() и Schedulers.parallel() выбрасывают IllegalStateException при попытке запустить в них блокирующий оператор: block(), blockLast(), toIterable(), toStream(). 6. 1.

Но лучшей практикой для блокирующих операторов считается использование Schedulers.elastic() или Schedulers.newElastic(). Если всё-таки хотите заниматься подобными извращениями — используйте Shchedulers.newSingle() и Schedulers.newParallel().

Из Executor тоже можно, но не рекомендуется. Экземпляры Scheduler так же можно инициализировать из ExecutorService с помощью Schedulers.fromExecutorService().

К примеру, уже знакомый Flux.interval() по умолчанию запускается на Schedulers.parallel(). Некоторые операторы из Flux и Mono запускаются сразу на конкретном Scheduler (но можно передать и свой).

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

Контекст исполнения

Нужно прибегнуть к одному из уже знакомых нам операторов: Как же сменить контекст исполнения?

  • publishOn();
  • subscribeOn().

Они оба принимают Scheduler в качестве аргумента и позволяют изменить контекст выполнения на указанный Scheduler.
Но почему их два и в чём же разница?

Все последующие Subscriber будут выполняться в контексте указанного Scheduler. В случае с publishOn этот оператор применяется так же, как и любой другой, посреди цепочки вызовов.

После вызова subscribe() контекстом выполнения будет указанный Scheduler. В случае с subscribeOn оператор «глобальный», срабатывает сразу на всю цепочку Subscriber. Последующие вызовы subscribeOn игнорируются. Далее контекст может изменяться с помощью оператора publishOn.

Код вида Спасибо stackoverflow за пример.

Flux.just("a", "b", "c") //this is where subscription triggers data production //this is influenced by subscribeOn .doOnNext(v -> System.out.println("before publishOn: " + Thread.currentThread().getName())) .publishOn(Schedulers.elastic()) //the rest is influenced by publishOn .doOnNext(v -> System.out.println("after publishOn: " + Thread.currentThread().getName())) .subscribeOn(Schedulers.parallel()) .subscribe(v -> System.out.println("received " + v + " on " + Thread.currentThread().getName())); Thread.sleep(5000);

выведет следующий результат:

before publishOn: parallel-1
before publishOn: parallel-1
after publishOn: elastic-2
before publishOn: parallel-1
received a on elastic-2
after publishOn: elastic-2
received b on elastic-2
after publishOn: elastic-2
received c on elastic-2

Обработка ошибок

В Reactor исключения воспринимаются как terminal event (терминальное событие).
Если где-то произошло исключение, значит, что-то пошло не так, наш конвейер останавливается, а ошибка прокидывается до финального Subscriber и его метода onError.

Любимая картинка

Reactor не знает о серьёзности возникшего исключения и понятия не имеет, что с ним делать. Почему так? Для этого у Subscriber есть прекрасный метод onError(). Подобные ситуации должны как-то обрабатываться на уровне приложения. Reactor вынуждает нас его переопределять и как-то реагировать на исключение, в противном случае мы будем получать UnsupportedOperationException при ошибках.

Уточнение

Чтобы понять, что это действительно он, существует вспомогательный статический метод Errors.errorCallbackNotImplemented(Throwable t). Если быть честным, то выкидывает он наследника UnsupportedOperationException — ErrorCallbackNotImplemented.

Философия try/catch

Ну, не считая всеми любимых пустых catch-блоков. Что обычно делается внутри catch-блока в Java?

  1. Static Fallback Value. Вернуть какое-то статическое значение по умолчанию:

    try { return fromRemoteAndUnstableSource();
    } catch(Throwable e) { return DEFAULT_VALUE;
    }

  2. Fallback Method. Вызов альтернативного метода в случае ошибки:

    try { return fromRemoteAndUnstableSource();
    } catch(Throwable e) { return loadValueFromCache();
    }

  3. Dynamic Fallback Value. Вернуть какое-то динамическое значение в зависимости от исключения:

    try { return fromRemoteAndUnstableSource();
    } catch(Throwable e) return DEFAULT_VALUE;
    }

  4. Обернуть в какое-то доменное исключение и пробросить исключение дальше: Catch and Rethrow.

    try { return fromRemoteAndUnstableSource();
    } catch(Throwable e) { throw new BusinessException(e);
    }

  5. Залогировать ошибку и пробросить исключение дальше: Log or React on the Side.

    try { return fromRemoteAndUnstableSource();
    } catch(Throwable e) { logger.error(e.getMessage(), e); throw e;
    }

  6. Using Resources and the Finally Block. Освобождение ресурсов в finally-блоке или с помощью try-with-resources.

    try { return fromRemoteAndUnstableSource();
    } catch(Throwable e) { //do nothing
    } finally { cleanAllStuff();
    }

Приятная новость: всё это есть в Reactor в виде эквивалентных операторов.

Менее приятная новость: в случае ошибки ваша прекрасная последовательность данных всё равно завершится (terminal event), несмотря на оператора обработки ошибок.
Подобные операторы используются скорее для создания новой, резервной (fallback) последовательности на замену завершившейся.

Приведём пример:

Flux<String> s = Flux.range(1, 10) .map(v -> doSomethingDangerous(v)) .map(v -> doSecondTransform(v));
s.subscribe(value -> System.out.println("RECEIVED " + value), error -> System.err.println("CAUGHT " + error));

Можно сравнить это с похожим блоком try / catch:

try { for (int i = 1; i < 11; i++) { String v1 = doSomethingDangerous(i); String v2 = doSecondTransform(v1); System.out.println("RECEIVED " + v2); }
} catch (Throwable t) { System.err.println("CAUGHT " + t);
}

Обратите внимание: for прерывается!

Ещё пример завершения последовательности в случае ошибки:

Flux<String> flux = Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3) return "tick " + input; throw new RuntimeException("boom"); }) .onErrorReturn("Uh oh"); flux.subscribe(System.out::println);
Thread.sleep(2100);

На экране получим:

tick 0
tick 1
tick 2
Uh oh

Реализация try/catch

Static Fallback Value

Используя оператор onErrorReturn:

Flux.just(10) .map(this::doSomethingDangerous) .onErrorReturn("RECOVERED");

Можно добавить предикат, чтобы оператор выполнялся не для всех исключений:

Flux.just(10) .map(this::doSomethingDangerous) .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");

Fallback Method

Используя оператор onErrorResume,

Flux.just("key1", "key2") .flatMap(k -> callExternalService(k)) //загружаем данные извне .onErrorResume(e -> getFromCache(k)); //в случае ошибки берём из кеша

можно добавить предикат, чтобы оператор выполнялся не для всех исключений:

Flux.just("timeout1", "unknown", "key2") .flatMap(k -> callExternalService(k)) .onErrorResume(TimeoutException.class, getFromCache(k)) .onErrorResume((Predicate<Throwable>) error -> error instanceof UnknownKeyException, registerNewEntry(k, "DEFAULT"));

Аналогично:

Flux.just("timeout1", "unknown", "key2") .flatMap(k -> callExternalService(k)) .onErrorResume(error -> { if (error instanceof TimeoutException) return getFromCache(k); else if (error instanceof UnknownKeyException) return registerNewEntry(k, "DEFAULT"); else return Flux.error(error); });

Dynamic Fallback Value

Всё тот же onErrorResume:

erroringFlux.onErrorResume(error -> Mono.just( myWrapper.fromError(error); //тут, в зависимости от ошибки, будут совершены разные действия
));

Catch and Rethrow

Первый — с оператором onErrorResume: Можно сделать двумя способами.

Flux.just("timeout1") .flatMap(k -> callExternalService(k)) .onErrorResume(original -> Flux.error( new BusinessException("oops, SLA exceeded", original) );

И более лаконично — с помощью onErrorMap:

Flux.just("timeout1") .flatMap(k -> callExternalService(k)) .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));

Log or React on the Side

Добавить какой-то side effect (метрики, логирование) можно с помощью оператора doOnError

LongAdder failureStat = new LongAdder();
Flux<String> flux = Flux.just("unknown") .flatMap(k -> callExternalService(k)) .doOnError(e -> { failureStat.increment(); log("uh oh, falling back, service failed for key " + k); }) .onErrorResume(e -> getFromCache(k));

Using Resources and the Finally Block

На выручку нам приходит оператор Flux.using(). Итак, как же получить аналог try-with-resources или блок finally?

Он заставляет нас реализовать метод dispose(). Для начала нужно ознакомиться с интерфейсом Disposable. Вызовы метода должны быть идемпотентными. Вызов этого метода должен отменять или завершать какую-то задачу или последовательность. Использованные ресурсы должны быть освобождены.

AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() { @Override public void dispose() { isDisposed.set(true); } @Override public String toString() { return "DISPOSABLE"; }
}; Flux<String> flux = Flux.using( () -> disposableInstance, // генерация данных disposable -> Flux.just(disposable.toString()), //обработка Disposable::dispose //освобождение ресурсов
);

Повторение | Retrying

При повторе (retry) наблюдается похожее поведение, оригинальная последовательность завершается (terminate event), мы повторно подписываемся (re-subscribing) на Flux.

Код Разберём на примере.

Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3) return "tick " + input; throw new RuntimeException("boom"); }) .elapsed() .retry(1) .subscribe(System.out::println, System.err::println); Thread.sleep(2100);

выведет

259,tick 0
249,tick 1
251,tick 2
506,tick 0
248,tick 1
253,tick 2
java.lang.RuntimeException: boom

Более сложная логика повторов доступна с использованием оператора retryWhen().

Заключение

Надеюсь, этой небольшой заметке удалось пролить свет на некоторые особенности работы Reactor.

Подведём итоги:

  • контекстом выполнения можно манипулировать с помощью операторов publishOn, subscribeOn и Schedulers;
  • для обработки исключительных ситуаций есть множество операторов на все случаи жизни;
  • посылание terminate signal приводит к завершению оригинальной «последовательности»;
  • для освобождения ресурсов используется интерфейс Dispose.

Спасибо за внимание!

По мотивам документации Reactor

Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.

и контрибьюторы / мейнтейнеры. Меня тут нет, но есть более достойные мужи, в т.ч.


Оставить комментарий

Ваш email нигде не будет показан
Обязательные для заполнения поля помечены *

*

x

Ещё Hi-Tech Интересное!

[Перевод] Что мне нравилось в Поле Аллене

Воспоминания Билла Гейтса о Поле Аллене, с которым они вместе, будучи ещё студентами, основали в 1975 году компанию «Microsoft» (название компании предложил именно Пол) Я хочу выразить свои сожаления его сестре Джоди, его семье и множеству его друзей и коллег ...

Где работать в ИТ #2: «СКБ Контур»

В конце октября ей исполняется 30 лет, количество всех сотрудников перевалило за 8 тысяч. «СКБ Контур» — одна из крупнейших и старейших ИТ компаний в России. По оценкам, собранным на сервисе оценки работодателей «Моего круга», в июле 2018 «Контур» разделил ...