Хабрахабр

[Перевод] Современный подход к конкурентности в Android: корутины в Kotlin

Привет, Хабр!

Сегодня мы решили предложить вашему вниманию перевод статьи, рассказывающей о корутинах Kotlin и о правильной работе с потоками в Android. Напоминаем, что у нас уже открыт предзаказ на долгожданную книгу о языке Kotlin из знаменитой серии Big Nerd Ranch Guides. Тема обсуждается очень активно, поэтому для полноты картины также рекомендуем посмотреть эту статью с Хабра и этот подробный пост из блога компании Axmor Software.

Современный фреймворк для обеспечения конкурентности в Java/Android учиняет ад обратных вызовов и приводит к блокирующим состояниям, так как в Android нет достаточно простого способа гарантировать потокобезопасность.

Корутины Kotlin – это очень эффективный и полный инструментарий, позволяющий гораздо проще и производительнее управлять конкурентностью.

Приостановка и блокирование: в чем разница

Философия корутин заключается в определении контекста, позволяющего ожидать, пока завершатся фоновые операции, не блокируя при этом основного потока. Корутины не заменяют потоков, а скорее дают фреймворк для управления ими.

Цель корутин в данном случае – обойтись без обратных вызовов и упростить конкуренцию.

Простейший пример

В нем мы извлечем изображение из потока IO и отправим это изображение на обработку обратно в Main. Для начала возьмем самый простой пример: запустим корутину в контексте Main (главный поток).

launch(Dispatchers.Main) // получить из контекста IO imageView.setImageBitmap(image) // Возвращаемся в главный поток
}

Причем, пока getImage выполняется в выделенном пуле потоков IO, главный поток свободен и может взяться за любую другую задачу! Код прост как однопоточная функция. Как только getImage() возвратится и looper из главного потока станет доступен, корутина возобновит работу в главном потоке и вызовет imageView.setImageBitmap(image). Функция withContext приостанавливает текущую корутину, пока работает ее действие (getImage()).

Мы применим дуэт async/await, чтобы две эти задачи выполнялись параллельно, и воспользуемся их результатом в главном потоке, как только обе задачи будут готовы: Второй пример: теперь нам требуется, чтобы были выполнены 2 фоновые задачи, чтобы ими можно было воспользоваться.

val job = launch(Dispatchers.Main) { val deferred1 = async(Dispatchers.Default) { getFirstValue() } val deferred2 = async(Dispatchers.IO) { getSecondValue() } useValues(deferred1.await(), deferred2.await())
}
job.join() // приостанавливает выполнение актуальной корутины, пока задача не будет выполнена

При вызове без параметров она работает в контексте, задаваемом по умолчанию для текущей области видимости. async подобен launch, но возвращает deferred (сущность Kotlin, эквивалентная Future), поэтому ее результат можно получить при помощи await().

Она работает как и в любом другом языке, с той оговоркой, что просто приостанавливает корутину, а не блокирует поток. Опять же, главный поток остается свободен, пока мы дожидаемся наших 2 значений.
Как видите, функция launch возвращает Job, который можно использовать для ожидания, пока операция завершится – это делается при помощи функции join().

Диспетчеризация

Это действие, позволяющее «перепрыгнуть» от одного потока к другому. Диспетчеризация – ключевая концепция при работе с корутинами.

Рассмотрим, как в java выглядит эквивалент для диспетчеризации в Main, то есть,

runOnUiThread:
public final void runOnUiThread(Runnable action) { if (Thread.currentThread() != mUiThread) { mHandler.post(action); // Диспетчеризация } else { action.run(); // Немедленное выполнение }
}

Итак, это действительно очень подходящая реализация: Реализация контекста Main для Android – это диспетчер на основе Handler.

launch(Dispatchers.Main) { ... } vs
launch(Dispatchers.Main, CoroutineStart.UNDISPATCHED) { ... } // Начиная с kotlinx 0.26:
launch(Dispatchers.Main.immediate) { ... }

Main) посылает Runnable в Handler, так что его код выполняется не сразу. launch(Dispatchers.

Main, CoroutineStart. launch(Dispatchers. UNDISPATCHED) немедленно выполнит свое лямбда-выражение в текущем потоке.

Main гарантирует, что когда корутина возобновит работу, она будет направлена в главный поток; кроме того, Handler используется здесь как нативная реализация Android для отправки в цикл событий приложения. Dispatchers.

Точная реализация выглядит так:

val Main: HandlerDispatcher = HandlerContext(mainHandler, "Main")

Вот хорошая статья помогающая разобраться в тонкостях диспетчеризации в Android:
Understanding Android Core: Looper, Handler, and HandlerThread.

Контекст корутины

Контекст корутины (он же – диспетчер корутины) определяет, в каком потоке будет выполняться ее код, что делать, если будет выброшено исключение, и обращается к родительскому контексту для распространения отмены.

val job = Job()
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable -> whatever(throwable)
} launch(Disaptchers.Default+exceptionHandler+job) { ... }

A exceptionHandler получит все исключения, выброшенные в этих корутинах. job.cancel() отменит все корутины, родителем которых является job.

Область видимости

Интерфейс coroutineScope упрощает обработку ошибок:
Если откажет какая-либо из его дочерних корутин, то откажет и вся область видимости, и все дочерние корутины будут отменены.

В примере async, если извлечь значение не удалось, а другая задача при этом продолжила работу – у нас возникает поврежденное состояние, и с этим надо что-то делать.

Также, если deferred2 откажет, deferred1 будет отменена. При работе с coroutineScope функция useValues будет вызываться лишь в случае, если извлечение обоих значений прошло успешно.

coroutineScope { val deferred1 = async(Dispatchers.Default) { getFirstValue() } val deferred2 = async(Dispatchers.IO) { getSecondValue() } useValues(deferred1.await(), deferred2.await())
}

Также можно “поместить в область видимости” целый класс, чтобы задать для него контекст CoroutineContext по умолчанию и использовать его.

Пример класса, реализующего интерфейс CoroutineScope:

open class ScopedViewModel : ViewModel(), CoroutineScope { protected val job = Job() override val coroutineContext = Dispatchers.Main+job override fun onCleared() { super.onCleared() job.cancel() }
}

Запуск корутин в CoroutineScope:

Диспетчер launch или async, задаваемый по умолчанию, теперь становится диспетчером актуальной области видимости.

launch { val foo = withContext(Dispatchers.IO) { … } // лямбда-выражение выполняется в контексте CoroutineContext области видимости …
} launch(Dispatchers.Default) { // лямбда-выражение выполняется в задаваемом по умолчанию пуле потоков …
}

Автономный запуск корутины (вне какого-либо CoroutineScope):

GlobalScope.launch(Dispatchers.Main) { // лямбда-выражение выполняется в главном потоке. …
}

Можно даже определить область видимости для приложения, задав диспетчер Main по умолчанию:

object AppScope : CoroutineScope by GlobalScope { override val coroutineContext = Dispatchers.Main.immediate
}

Замечания

  • Корутины ограничивают интероперабельность с Java
  • Ограничивают изменяемость во избежание блокировок
  • Корутины предназначены для ожидания, а не для организации потоков
  • Избегайте I/O в Dispatchers.DefaultMain…) — для этого предназначен Dispatchers.IO
  • Потоки ресурсозатратны, поэтому используются однопоточные контексты
  • Dispatchers.Default основан на ForkJoinPool, появившемся в Android 5+
  • Корутины можно использовать посредством каналов

Избавляемся от блокировок и обратных вызовов при помощи каналов

Определение канала из документации JetBrains:

Ключевое отличие заключается в том, что он не блокирует операцию put, он предусматривает приостанавливающий send (или неблокирующий offer), а вместо блокирования операции take предусматривает приостанавливающий receive. Канал Channel концептуально очень похож на BlockingQueue.

Акторы

Рассмотрим простой инструмент для работы с каналами: Actor.

Actor, опять же, очень похож на Handler: мы определяем контекст корутины (то, есть, поток, в котором собираемся выполнять действия) и работаем с ним в последовательном порядке.

Разница, конечно же, заключается в том, что здесь используются корутины; можно указать мощность, а выполняемый код – приостановить.

Он гарантирует выполнение команды и ограничивает операции в ее контексте. В принципе, actor будет переадресовывать любую команду каналу корутины. Такой подход отлично помогает избавиться от вызовов synchronize и держать все потоки свободными!

protected val updateActor by lazy { actor<Update>(capacity = Channel.UNLIMITED) { for (update in channel) when (update) { Refresh -> updateList() is Filter -> filter.filter(update.query) is MediaUpdate -> updateItems(update.mediaList as List<T>) is MediaAddition -> addMedia(update.media as T) is MediaListAddition -> addMedia(update.mediaList as List<T>) is MediaRemoval -> removeMedia(update.media as T) } }
}
// используем
fun filter(query: String?) = updateActor.offer(Filter(query))
// или
suspend fun filter(query: String?) = updateActor.send(Filter(query))

В данном примере мы пользуемся запечатанными классами Kotlin, выбирая, какое именно действие выполнить.

sealed class Update
object Refresh : Update()
class Filter(val query: String?) : Update()
class MediaAddition(val media: Media) : Update()

Это удобный способ добиться ограничения изменяемости. Причем, все эти действия будут поставлены в очередь, параллельно выполняться они никогда не будут.

Жизненный цикл Android + корутины

Акторы могут очень пригодиться и для управления пользовательским интерфейсом Android, упрощают отмену задач и предотвращают перегрузку главного потока.
Давайте это реализуем и вызовем job.cancel() при уничтожении активности.

class MyActivity : AppCompatActivity(), CoroutineScope { protected val job = SupervisorJob() // экземпляр Job для данной активности override val coroutineContext = Dispatchers.Main.immediate+job override fun onDestroy() { super.onDestroy() job.cancel() // отмена задачи при уничтожении активности }
}

Класс SupervisorJob похож на обычный Job с тем единственным исключением, что отмена распространяется только в нисходящем направлении.

Поэтому мы не отменяем всех корутин в Activity, когда одна из них отказывает.

Чуть лучше дела обстоят с функцией расширения, позволяющей открыть доступ к этому CoroutineContext из любого View в CoroutineScope.

val View.coroutineContext: CoroutineContext? get() = (context as? CoroutineScope)?.coroutineContext

В случае множественных нажатий промежуточные действия будут игнорироваться, исключая таким образом ошибки ANR (приложение не отвечает), и эти действия будут выполняться в области видимости Activity. Теперь мы можем все это скомбинировать, функция setOnClick создает объединенный actor для управления ее действиями onClick. Поэтому при уничтожении активности все это будет отменено.

fun View.setOnClick(action: suspend () -> Unit) { // запускаем один актор в качестве родителя задачи контекста val scope = (context as? CoroutineScope)?: AppScope val eventActor = scope.actor<Unit>(capacity = Channel.CONFLATED) { for (event in channel) action() } // устанавливаем слушатель для активации этого актора setOnClickListener { eventActor.offer(Unit) }
}

Можно заменить его на Channel. В данном примере мы задаем для Channel значение Conflated, чтобы он игнорировал часть событий, если их будет слишком много. UNLIMITED, если вы предпочитаете ставить события в очередь, не теряя ни одного из них, но все равно хотите защитить приложение от ошибки ANR.

Также можно комбинировать корутины и фреймворки Lifecycle, чтобы автоматизировать отмену задач, связанных с пользовательским интерфейсом:

val LifecycleOwner.untilDestroy: Job get() { val job = Job() lifecycle.addObserver(object: LifecycleObserver { @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY) fun onDestroy() { job.cancel() } }) return job
}
// использование
GlobalScope.launch(Dispatchers.Main, parent = untilDestroy) { /* здесь происходят удивительные вещи! */
}

Упрощаем ситуацию с обратными вызовами (часть 1)

Вот как можно преобразить использование API, основанного на обратных вызовах, благодаря Channel.

API работает вот так:

  1. requestBrowsing(url, listener) инициирует синтаксический разбор папки, расположенной по адресу url.
  2. Слушатель listener получает onMediaAdded(media: Media) для любого медиа-файла, обнаруженного в этой папке.
  3. listener.onBrowseEnd() вызывается по завершении синтаксического разбора папки

Вот старая функция refresh в поставщике контента для обозревателя VLC:

private val refreshList = mutableListOf<Media>() fun refresh() = requestBrowsing(url, refreshListener) private val refreshListener = object : EventListener{ override fun onMediaAdded(media: Media) { refreshList.add(media)) } override fun onBrowseEnd() { val list = refreshList.toMutableList() refreshList.clear() launch { dataset.value = list parseSubDirectories() } }
}

Как это улучшить?

Теперь обратные вызовы обозревателя будут лишь направлять медиа в этот канал, а затем закрывать его. Создаем канал, который будет запускаться в refresh.

Она создает канал, вызывает обозреватель VLC, затем формирует список медиа-файлов и обрабатывает его. Теперь функция refresh стала понятнее.

Вместо функций select или consumeEach можно использовать for для ожидания медиа, и этот цикл будет разрываться, как только канал browserChannel закроется.

private lateinit var browserChannel : Channel<Media> override fun onMediaAdded(media: Media) { browserChannel.offer(media)
} override fun onBrowseEnd() { browserChannel.close()
} suspend fun refresh() { browserChannel = Channel(Channel.UNLIMITED) val refreshList = mutableListOf<Media>() requestBrowsing(url) // Приостанавливается на каждой итерации в ожидании медиа for (media in browserChannel) refreshList.add(media) // Канал закрыт dataset.value = refreshList parseSubDirectories()
}

Упрощаем ситуацию с обратными вызовами (часть 2): Retrofit

Второй подход: мы вообще не используем корутины kotlinx, зато применяем корутинный core-фреймворк.

Смотрите, как на самом деле работают корутины!

Функция retrofitSuspendCall оборачивает запрос на вызов Retrofit Call, чтобы сделать из него функцию suspend.

Предоставленный таким образом обратный вызов обратится к continuation.resume(response), чтобы возобновить корутину откликом от сервера, как только тот будет получен. При помощи suspendCoroutine мы вызываем метод Call.enqueue и приостанавливаем корутину.

Далее нам остается просто объединить наши функции Retrofit в retrofitSuspendCall, чтобы с их помощью возвращать результаты запросов.

suspend inline fun <reified T> retrofitSuspendCall(request: () -> Call <T>
) : Response <T> = suspendCoroutine { continuation -> request.invoke().enqueue(object : Callback<T> { override fun onResponse(call: Call<T>, response: Response<T>) { continuation.resume(response) } override fun onFailure(call: Call<T>, t: Throwable) { continuation.resumeWithException(t) } })
} suspend fun browse(path: String?) = retrofitSuspendCall { ApiClient.browse(path)
} // использование (в контексте корутины Main)
livedata.value = Repo.browse(path)

Таким образом, вызов, блокирующий сеть, делается в выделенном потоке Retrofit, корутина находится здесь, ожидая отклика от сервера, а использовать ее в приложении – проще некуда!

Такая реализация вдохновлена библиотекой gildor/kotlin-coroutines-retrofit.

Также имеется JakeWharton/retrofit2-kotlin-coroutines-adapter с другой реализацией, дающей аналогичный результат.

Эпилог

Channel можно использовать и многими другими способами; посмотрите в BroadcastChannel более мощные реализации, которые могут вам пригодиться.
Также можно создавать каналы при помощи функции Produce.

Наконец, при помощи каналов удобно организовать коммуникацию между компонентами UI: адаптер может передавать события нажатий в свой фрагмент/активность через Channel или, например, через Actor.

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть