Главная » Хабрахабр » Асинхронное программирование на примерах: реконструкция методов java.util.concurrent.CompletableFuture

Асинхронное программирование на примерах: реконструкция методов java.util.concurrent.CompletableFuture

Для чего нужна реконструкция, если исходный код этого класса открыт?

Хотя бы потому, что там под капотом высокооптимизированный, трудночитаемый код, изучение которго мало что дает в педагогическом плане.

Поэтому мы воссоздадим семантику операций по их спецификациям, и напишем функционально эквивалентный, понятный и читаемый код, хотя возможно, не самый экономный по расходу памяти и времени процессора.

Начнем с относительно простого метода:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.
Type Parameters:
U - the function's return type
Parameters:
supplier - a function returning the value to be used to complete the returned CompletableFuture
executor - the executor to use for asynchronous execution
Returns:
the new CompletableFuture

Прочитаем внимательно спецификацию:

Returns a new CompletableFuture

То есть, создается объект типа CompletableFuture либо его подкласса и возвращается в качестве результата.

that is asynchronously completed by a task running in the given executor`

Кроме того, создается задача, исполняемая на Executor'e.
Как мы знаем, Executor'ы принимают только объекты типа Runnable.

Runnable это интерфейс, и первый объект вполне может его реализовывать — так мы совместим две функции в одном объекте.

completed ... with the value obtained by calling the given Supplier.

Этот Runnable должен вызвать данный Supplier и полученным значением завершить созданный CompletableFuture.

Supplier — это функция без параметров, так что закодировать все это очень просто:

class CompletableFutureForSupplyAsync<U> extends CompletableFuture<U> implements Runnable public void run() { try { U result = supplier.get(); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { CompletableFutureForSupplyAsync<U> task = new CompletableFutureForSupplyAsync<>(supplier); executor.execute(task); return task; }

Следующий пример несколько сложнее:

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion.
Specified by:
thenApplyAsync in interface CompletionStage<T>
Type Parameters:
U - the function's return type
Parameters:
fn - the function to use to compute the value of the returned CompletionStage
executor - the executor to use for asynchronous execution
Returns:
the new CompletionStage

is executed using the supplied Executor Returns a new CompletionStage that...

Здесь нам прямо предлагают оформить создаваемый объект оформить в виде Runnable.

with this stage's result as the argument to the supplied function. ...

Передаваемая нам функция имеет параметр, и значением этого параметра служит значение, завершающее текущий CompletionStage. а вот это уже интереснее. Вместо этого мы должны договориться с текущим CompletionStage,
чтобы в момент своего завершения она передала свое значение в задачу. В момент вызова thenApplyAsync это значение может быть неизвестно, поэтому сразу запустить задачу на Executor мы не можем. Среди многочесленных методов CompletionStage имеются один, в точности подходщий для этой цели, это whenComplete:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes.

То есть, во вновь создаваемом объекте-задаче достаточно реализовать еще интерфейс BiConsumer для приема аргумента:

class CompletableFutureForApplyAsync<T, U> extends CompletableFuture<U> implements Runnable, BiConsumer<T,Throwable> { Function<? super T,? extends U> fn; Executor executor; T arg; Throwable throwable; public CompletableFutureForApplyAsync(Function<? super T,? extends U> fn, Executor executor) { this.fn = fn; this.executor = executor; } @Override // implementation of BiConsumer interface public void accept(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; } else { this.arg = argument; } executor.execute(this); } @Override public void run() { if (throwable == null) { try { U result = fn.apply(arg); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } else { super.completeExceptionally(throwable); } } } public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor ) { CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor); this.whenComplete(task); return task; }
}

Этот пример очень важен для понимания природы асинхронного программирования, поэтому еще раз перечислим его основные шаги:

1) создается асинхронная процедура:

CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor);

2) она пока не готова к исполнению, поэтому мы просим поставщика недостающего аргумента передать нам этот аргумент в будущем,
вызвав поданный нами метод:

this.whenComplete(task);

3) в этом методе мы не только сохраняем полученный аргумент, но и запускаем задачу на исполнение (см метод accept()).

Но исполнятся эта цепочка будет строго последовательно, без всякого параллелизма. 4) исполнение задачи сводится к выполнению поданой нам функции и сохранении результата.
Этот реультат может быть точно также затребован другими процедурами с помощью метода whenComplete(), примененного уже к нашему вновь построенному объекту, так что мы можем построить цепочку асинхронных процедур произвольной длины.

A как же изобразить более сложную диаграмму вычислений, содержащую параллельные ветви?
Для этого служит метод thenCombineAsync.

Если в предыдущем примере мы запускали асинхронную процедуру с одним аргументом, то в этом — с двумя.

При этом вычисление обоих аргументов может происходить параллельно.

ublic <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
Description copied from interface: CompletionStage
Returns a new CompletionStage that, when this and the other given stage complete normally,
is executed using the supplied executor, with the two results as arguments to the supplied function.

extends U> other, являющийся асинхронным поставщиком второго аргумента. Здесь все как в предыдущем примере с thenApplyAsync, но параметр-функция уже от двух аргументов, и добавлен параметр CompletionStage<?

Как же нам обеспечить обработку второго аргумента?

Ну во-первых, вместо одной переменной T arg описать две: T arg1; U arg2;, a вместо одного метода public void accept(T argument, Throwable throwable) описать два — accept1 и accept2,
каждый из которых работает со своим аргументом.

При этом наш строящийся объект уже не имплементирует интерфейс BiConsumer<T,Throwable> и мы не можем уже написать ключевое предложение для связи узлов графа асинхронных вычислений

this.whenComplete(task);

К счастью, объект функционального интерфейса может быть изображен ссылкой на метод, без заключения его в отдельный класс:

this.whenComplete(task::accept1); other.whenComplete(task::accept2);

То есть, текущий объект поставляет первый аргумент, а объект other — второй.

Вот только коды методов придется изменить, чтобы он не запускали задачу при приходе своего аргумента, но также проверяли поступление и второго:

public synchronized void accept1(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; executor.execute(this); } else { this.arg1 = argument; if (arg2 != null) { executor.execute(this); } } }

Аналогично описывается метод accept2.

Отметим, что:

  • методы стали синхронизованными (работаем с общими данными)
  • в случае передачи ошибки ждать второго аргумента не нужно.
  • проверка поступления аргумента сравнением на null — не самый лучший способ, может, надо завести на каждый аргумент булевскую переменную.

Таким способом можно сделать асинхронные процедуры и от большего числа аргументов, чем два, но сразу приходит мысль — может все же сделать отдельный класс для параметров, чтобы не писать для приема каждого параметра свой метод, а обходится динамическим созданием параметров?

Parameter<Integer> arg1 = new Parameter<>(); Parameter<Float> arg2 = new Parameter<>(); ... future1.whenComplete(arg1); future2.whenComplete(arg2);

Да, такой класс создать можно, но об этом в следующий раз.

Краткое резюме из вышеизложенного:

  • асинхронная программа — это сеть связанных между собой асинхронных процедур,
    точно также, как мультипоточная программа — сеть связанных между собой потоков исполнения (threds).

Но средства связи потоков и асинхронных процедур кардинально отличаются.

Потоки связываются с помощью семафоров, блокирующих очередей и прочих подобных объектов,
которые блокируют поток получателя, если информация еще не поступила, но поток уже пытается ее извлечь с помощью pull-based операции.

Асинхронные процедуры — получатели просто не заходят на исполнение, пока на будет готова вся необходимая им информация.

Они пассивно ждут, пока поставщики информации сами не передадут ее с помощью push-based операция.

Благодаря этому они не тратят память на стек во время ожидания, и, следовательно, занимают гораздо меньше памяти, чем потоки исполнения.

  • построение сети асинхронных программ сводится к созданию объектов и связыванию их между собой.

Набор методов CompletableFuture ровно это и делает, и в принципе, без них можно обойтись, создавая объекты явно, как показано в приведенных выше примерах.

Но для этого надо иметь классы, аналогичные тем, что были описаны в этих примерах.
По какой-то причине создатели java.util.concurrent предпочли не давать пользователям доступ к этим классам и скрыли их в глубине кода CompletableFuture.

Исходный код примеров доступен на Гитхабе. Те, кто хочет иметь перед глазами наглядное представление создаваемой асинхронной сети, могут реконструировать эти классы, продолжив приведенные примеры.


Оставить комментарий

Ваш email нигде не будет показан
Обязательные для заполнения поля помечены *

*

x

Ещё Hi-Tech Интересное!

[Из песочницы] LVEE — самая неформальная и душевная ИТ-тусовка

О событии Под Минском завершилась ежегодная конференция LVEE 2018 (Linux Vacation/Eastern Europe), организованная белорусской группой пользователей Linux еще в 2005 году. В мероприятии принимали участие более сотни людей из 7 стран, а программу составили 23 доклада, лайтнинги, воркшоп, круглый стол, ...

Губительная ошибка новичков в геймдеве

Перед началом любого дела необходимо составить план, сделать «пробы пера», одним словом — черновик. Именно это помогает определить стартовую точку и понять направление движения. Не хотите тратить тонны усилий впустую? Хотите делать быстрее и качественней остальных? 90% начинающих разработчиков этого ...