Хабрахабр

Справочник по источникам событий в Rx

Они забывают о специализированных классах Single, Maybe и Completable, которые зачастую способны добавить больше ясности в код. RxJava используется в большом количестве android-приложений, но при этом многие не знают других источников событий, кроме Observable и, может быть, Flowable.

Он представляет собой операцию, которая может быть выполнена или нет. Под катом вас ждёт шпаргалка по источникам событий, которые существуют в RxJava.
Completable фактически является Rx-аналогом Runnable. Соответственно, для подписки на него необходимо реализовать onComplete и onError. Если проводить аналогию с Kotlin, то это fun completable() из мира Rx. Он не может быть создан из значения (Observable#just, ...), потому что не рассчитан на это.

Продолжая сравнение с Kotlin, можно сказать, что Single — это fun single(): T . Single — реактивный Callable, потому что тут появляется возможность вернуть результат операции. Таким образом, чтобы подписаться на него, необходимо реализовать onSuccess(T) и onError.

Тут сложнее провести однозначную параллель с методами, но я думаю, что Maybe — это fun maybe(): T? Maybe — нечто среднее между Single и Completable, потому что поддерживает одно значение, отсутствие значений и ошибку. Несложно догадаться, что для подписки потребуется определить onSuccess(T), onComplete и onError.
{ }, которая возвращает null, когда результата нет.

Т.е. Тут важно обратить внимание, что onSuccess(T) и onComplete — взаимоисключающие. в случае вызова первого можно не ждать второго.

Observable — наиболее часто встречающийся источник, что обусловлено его универсальностью. Он умеет как не производить события вообще, так и генерировать множество таковых, поэтому его можно использовать всегда, когда не подходят остальные варианты. Несмотря на это, у Observable есть недостаток — он совершенно не умеет обрабатывать backpressure. Для подписки на него нужны onNext(T), onError и onComplete.

Это может привести к неприятностям вроде OutOfMemoryError. Backpressure — ситуация, когда новые события поступают существенно быстрее, чем успевают обрабатываться, и начинают скапливаться в буфере, переполняя его. Подробнее можно посмотреть тут.

ConnectableObservable — прогретый вариант Observable. Все источники данных начинают выдавать свой поток событий в момент подписки. Но только не этот парень. Для этого ConnectableObservable ждёт вызова connect. Сделано это для того, чтобы несколько наблюдателей могли обозревать один поток событий, не перезапуская его при каждой подписке. Для иллюстрации приведу следующий сниппет:

val observable = Observable.fromCallable { Log.d("RxLogs", "observable fromCallable executed") Thread.sleep(1000)
}.subscribeOn(Schedulers.computation()) observable.subscribe()
observable.subscribe()
observable.subscribe()
observable.subscribe()

В консоли будет:
observable fromCallable executed
observable fromCallable executed
observable fromCallable executed
observable fromCallable executed

val connectedObservable = Observable.fromCallable { Log.d("RxLogs", "connectedObservable fromCallable executed") Thread.sleep(1000)
}.subscribeOn(Schedulers.computation()) .publish() connectedObservable.subscribe()
connectedObservable.subscribe()
connectedObservable.subscribe()
connectedObservable.subscribe() connectedObservable.connect()

А в этом случае: observable fromCallable executed

Когда требуется обрабатывать более 10000 событий, происходящих быстро одно за другим, вместо Flowable рекомендуется использовать Observable. Flowable — источник, предоставляющий дополнительные операторы для обработки backpressure.

Последний может создавать ConnectableFlowable, открывающий те же возможности, что и ConnectableObservable.

Говоря о генераторах событий, нельзя не упомянуть Subject и Processor.

Это позволяет использовать его, например, в разного рода контроллерах, которые будут отдавать его наружу в виде Observable и внутри оповещать как Observer. Subject — класс, который может быть и источником, и обозревателем. Далее пройдёмся по разным реализациям этого класса.

При возникновении ошибки никакие события проброшены не будут. AsyncSubject/AsyncProcessor держит последнее событие до корректного завершения потока, после чего отдаёт его подписчикам.

image

После конца потока или ошибки он возвращает соответствующие события. PublishSubject/PublishProcessor пробрасывает приходящие в него события дальше, пока не поступит терминальный сигнал.

image

BehaviorSubject/BehaviorProcessor работает аналогично PublishSubject/PublishProcessor, но при подписке возвращает последнее событие, если оно есть и если Subject не перешёл в терминальное состояние.

image

Возвращает не одно последнее событие, а сколько душе угодно. ReplaySubject/ReplayProcessor — BehaviourSubject/BehaviorProcessor на стероидах. Если подписаться на завершённый ReplaySubject или ReplayProcessor, то будут получены все накопленные данные.

image

Во время подписки первый вернёт последнее событие, а второй нет. Таким образом, ReplaySubject.createWithSize(1) и BehaviourSubject.create() после перехода в терминальное состояние работают по-разному. Оно же верно и для ReplayProcessor.

CompletableSubject, MaybeSubject и SingleSubject работают аналогично PublishSubject, только рассчитаны на использование с Completable, Maybe и Single соответственно.

Он выбрасывает IllegalStateException при попытке повторной подписки. UnicastSubject/UnicastProcessor — это фактически ReplaySubject, который следит, чтобы у него был только один подписчик.

image

следующий сниппет Т.е.

val subject = UnicastSubject.create<String>(3)
repeat(3) { subject.onNext(it.toString())
} subject.onComplete() subject.subscribe({ Log.d("RxLogs", it)
}, {
}, { Log.d("RxLogs", "complete")
})

выведет в лог
0
1
2
complete

Он умеет обрабатывать backpressure для входящего в него потока. MulticastProcessor работает по аналогии с PublishProcessor, за исключением одной небольшой особенности. MulticastProcessor позволяет задать размер буфера, в котором он будет предзапрашивать элементы из upstream для будущих подписчиков.

Поэтому, когда на него подписывается первый наблюдатель, он тут же выдаёт содержимое буфера, который моментально заполняется новыми событиями. На схеме ниже создаётся процессор с хранилищем на 2 элемента, которые он сразу запрашивает у своего источника. После терминального события MulticastProcessor очищает своё хранилище и новые подписчики сразу получают завершение потока.

image

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть