Хабрахабр

Миграция на инфраструктуру async-await в Rust

birds migration
img source

39, а в месте с ней и стабилизация async-await фичи. На прошлой неделе для Rust комьюнити случилось огромное событие — вышла версия компилятора 1. Детального разбора асинхронности в Rust я делать не буду, есть всё ещё актуальные статьи на хабре, которые помогут войти в тему: В этом посте я постараюсь резюмировать все релевантные изменения в компиляторе и экосистеме, а также предоставить инструкции по миграции на async-await парадигму.

Помимо указанных статей можно также обратиться к документациям стандартной библиотеки и нужных крейтов, а также почитать async-book (на англ.).

39 и должны работать на всех последующих версиях. Все примеры, рассматриваемые в статье, работают на стабильном компиляторе 1. Конечный код доступен на github.

1. Для реализации асинхронного кода использовалась библиотека futures-0. Они оперируют с типами Result<..> и предоставляют набор комбинаторов. Она предоставляет базовые типажи futures::Future и futures::Stream для работы с отложенными вычислениями. Помимо этого, библиотека предоставляет каналы для общения между задачами (task), различные интерфейсы для работы с экзекьютором и его системой задач и прочее.

Рассмотрим пример, который генерирует числовой ряд из старших 32 бит чисел Фибоначчи и отправляет их в Sink:

// futures 0.1.29
use futures::prelude::*;
use futures::; fn sink_fibb_series( sink: impl Sink<SinkItem = u32, SinkError = ()>,
) -> impl Future<Item = (), Error = ()> { stream::unfold((1u32, 1), |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some(future::ok((fact, (fact, n + 1)))) }) .forward(sink) .map(|_v| ())
}

Зам.: считать CPU-bound задачи на корутинах не самое лучшее применение, зато пример самодостаточен и прост.

В futures 0. Как можно заметить, код выглядит достаточно громоздко: необходимо указывать возвращаемое значение, несмотря на то, что никакого полезного значения в нем нет. 3 код становится немного проще:

// futures 0.3.1
use futures::prelude::*;
use futures::stream; async fn sink_fibb_series(sink: impl Sink<u32>) { stream::unfold((1u32, 1), |(mut fact, n)| { async move { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } }) .map(Ok) .forward(sink) .map(|_v| ()) .await;
}

Поскольку в нашем случае это кортеж нулевого размера, то его можно попросту опустить, как и в обычных функциях. Здесь у функции добавляется ключевое слово async, которое оборачивает возвращаемое значение функции в Future.

Этот вызов приостанавливает выполнение в текущем async-контексте и передает управление планировщику до тех пор, пока ожидаемое Future значение не будет готово. Для ожидания выполнения в конце цепочки вызовов используется ключевое слово await. поток управления становится нелинейным по сравнению с аналогичным синхронным кодом. Затем выполнение возобновляется с последнего await (в нашем примере завершая функцию), т.е.

Эта обёртка является полным аналогом объявлением новой async-функции с таким же телом и вызовом вместо async-блока. Ещё одно значительное различие — наличие async-блока в теле замыкания внутри stream::unfold.

#[feature(async_closure)

Возможно это замыкание в скором времени написать с помощью фичи async_closure, но увы, она пока не реализована:

async |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1)))
}

Аналогичные изменения коснулись типажа Future, определения по версиям следующие: Как можно заметить, новый типаж Stream работает не только с элементами типа Result<..>, как это было ранее.

// futures 0.1
trait Future { type Item; type Error; fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>;
} enum Async<T> { Ready(T), NotReady
} // futures 0.3
trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
} enum Poll<T> { Ready(T), Pending
}

Появился новый параметр Context, который предоставляет явный интерфейс для пробуждения текущей задачи. Помимо того, что тип возвращаемого значения может быть произвольным, также поменялись и входные параметры для Future::poll. Ранее то же самое можно было достигнуть через глобальные переменные конкретного экзекьютора (например через вызов tokio::prelude::task::current().notify()).

Эта обертка над указателем гарантирует "неподвижность" данных в памяти (более подробное описание Pin есть 1. Более фундаментальное отличие интерфейса в том, что ссылку на себя требуется оборачивать в Pin. 33 релизе компилятора на хабре, либо на английском, в документации стандартной библиотеки std::pin).

В качестве Sink возьмем половину канала из futures и на выходной стороне будем печатать результат с некоторой задержкой между итерациями. Попробуем теперь запустить наш пример. 1 такой код можно написать следующим образом: На futures-0.

use std::time::{Duration, Instant}; // futures 0.1.29
use futures::prelude::*;
use futures::sync::mpsc;
// tokio 0.1.22
use tokio::runtime::Runtime;
use tokio::timer::Delay; fn main() { let mut rt = Runtime::new().unwrap(); let (tx, rx) = mpsc::channel(32); rt.spawn(Box::new(sink_fibb_series(tx.sink_map_err(|_e| ())))); let fut = rx.take(100).for_each(|val| { println!("{}", val); Delay::new(Instant::now() + Duration::from_millis(50)) .map(|_| ()) .map_err(|_| ()) }); rt.spawn(Box::new(fut)); rt.shutdown_on_idle().wait().unwrap();
}

3 может выглядеть так: Аналогичный код с новым tokio (который на момент написания ещё alpha) и futures-0.

use std::time::Duration; // futures 0.3.1
use futures::channel::mpsc;
use futures::prelude::*;
// tokio 0.2.0-alpha.5
use tokio::timer; #[tokio::main]
async fn main() { let (tx, rx) = mpsc::channel(32); tokio::spawn(sink_fibb_series(tx)); rx.take(100) .for_each(|val| { println!("{}", val); timer::delay_for(Duration::from_millis(50)) }) .await;
}

По опыту автора, количество строк всегда выходит ощутимо меньше (порой даже при переписывании синхронного кода). Как можно заметить, код с новыми футурами стал значительно короче. Но как мне кажется, куда более весомое отличие в читабельности и отсутствия мешанины вызовов map/map_err, которые были необходимы из-за вариативности ошибок у стандартных типов в Result<..>.

Теперь они разбиты по двум разным типажам; те, которые реализованы для: Комбинаторы над элементами типа Result<..> тем не менее остались и находятся в отдельных типажах, некоторые со слегка обновленным названием.

Для примера попробуем реализовать Stream для уже рассмотренного числового ряда. Чуть более сложным оказывается реализация типажей Future и Stream. Общий тип для обеих версий футур будет следующий:

struct FactStream { fact: u32, n: u32,
} impl FactStream { fn new() -> Self { Self { fact: 1, n: 1 } }
}

1 реализация будет следующая: Для futures-0.

impl Stream for FactStream { type Item = u32; type Error = (); fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Ok(Async::Ready(Some(self.fact))) }
}

В случае с futures-0. В этом примере реализация Stream::poll фактически является полной копией замыкания stream::unfold. 3 реализация оказывается эквивалентной:

impl Stream for FactStream { type Item = u32; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Poll::Ready(Some(self.fact)) }
}

Однако, если тип какого-нибудь поля структуры не реализует Unpin, то std::ops::DerefMut не будет реализовать на Pin<&mut T> и тем самым не будет мутабельного доступа ко всем полям:

use std::marker::PhantomPinned; struct Fact { inner: u32, // маркер убирает реализацию Unpin у структуры _pin: PhantomPinned,
} struct FactStream { fact: Fact, n: u32,
} impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.inner.checked_mul(self.n).is_none() { self.fact.inner >>= 1; // <- ошибка компиляции // trait `DerefMut` is required to modify // through a dereference, but it is not // implemented for `std::pin::Pin<&mut FactStream>` } self.fact.inner *= self.n; // <- тут аналогично self.n += 1; // <- Poll::Ready(Some(self.fact.inner)) }
}

Unpin поля (в документации есть более развернутое описание). В таком случае, в том или ином виде придется воспользоваться unsafe функциями Pin::get_unchecked_mut и Pin::map_unchecked_mut для того, чтобы получить "проекцию" ! К счастью, для таких случаев существует безопасная обёртка реализованная в крейте pin_project (детали реализации можно найти в документации библиотеки).

use pin_project::pin_project; #[pin_project]
struct FactStream { fact: Fact, n: u32,
} impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); while this.fact.inner.checked_mul(*this.n).is_none() { this.fact.inner >>= 1; } this.fact.inner *= *this.n; *this.n += 1; Poll::Ready(Some(this.fact.inner)) }
}

Для этого существует модуль futures::compat, который позволяет конвертировать из старых типов в новые и наоборот. Последний момент, который хотелось бы осветить это интеропреабельность между типажами разных версий. 1 с помощью async-await: К примеру можно проитерироваться по Stream из futures-0.

use std::fmt::Display; // futures 0.3
use new_futures::compat::Compat01As03 as Compat;
use new_futures::StreamExt as _;
// futures 0.1
use old_futures::Stream as OldStream; async fn stream_iterate<E>( old_stream: impl OldStream<Item = impl Display, Error = E>,
) -> Result<(), E> { let stream = Compat::new(old_stream); let mut stream = Box::pin(stream); while let Some(item) = stream.as_mut().next().await.transpose()? { println!("{}", item); } Ok(())
}

Тем не менее, на нём мир не заканчивается, например существует альтернативный async-std, который помимо этого предоставляет футурные обертки для типов стандартной библиотеки, а также ThreadPool и LocalPool из рассмотренной библиотеки futures-0. Примечание: в статье рассматривается только экзекьютор tokio, как наиболее долгоживущий и распространенный. 3.

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть