Хабрахабр

[Перевод] Асинхронные Stream в C# 8

Другими словами, асинхронные методы помогают разработчикам выполнять асинхронные операции, которые не блокируют потоки и возвращают один скалярный результат. Функционал Async/Await появился в C# 5, чтобы улучшить скорость отклика пользовательского интерфейса и веб-доступ к ресурсам. После многочисленных попыток Microsoft упростить асинхронные операции, шаблон async/await завоевал хорошую репутацию среди разработчиков благодаря простому подходу.

Давайте рассмотрим некий обычный для такого синтаксиса метод async Task<int> DoAnythingAsync(). Существующие асинхронные методы значительно ограничены тем, что должны возвращать только одно значение. Из-за такого ограничения нельзя использовать эту функцию с ключевым словом yield и асинхронным интерфейсом IEnumerable<int> (чтобы вернуть результат асинхронного перечисления). Результатом его работы является некоторое одно значение.

Если объединить функцию async/await и оператор yield, тогда можно было бы использовать мощную модель программирования, известную как asynchronous data pull, или перечисление на основе pull based enumeration или асинхронную последовательность async sequence, как она называется в F#.

Эти изменения придадут асинхронному шаблону больше гибкости, а пользователь сможет извлекать данные откуда-либо (например из БД) с помощью отложенных асинхронных последовательностей или получать данные из асинхронных последовательностей частями по мере доступности. Новая возможность использования асинхронных потоков в C# 8 снимает ограничение, связанное с возвратом единственного результата, и позволяет асинхронному методу возвращать несколько значений.

Пример:

foreach await (var streamChunck in asyncStreams)
”);
}

Rx приобретает всё большее значение среди разработчиков и этот метод используется во многих языках программирования, например Java (RxJava) и JavaScript (RxJS). Ещё один подход для решения проблем, связанных с асинхронным программированием, заключается в использовании реактивных расширений (Rx).

Т.е. В основе Rx лежит модель на базе проталкивания данных (push-коллекции) (принцип Tell Don’t Ask), также известная, как реактивное программирование. Данные проталкиваются в очередь в асинхронном режиме и потребитель использует их в момент поступления. в отличии от IEnumerable, когда потребитель запрашивает следующий элемент, в модели Rx поставщик данных сигнализирует потребителю о появлении в последовательности нового элемента.

Вся концепция и преимущества рассматриваются с помощью множества примеров и демонстрационного кода. В этой статье я сравню модель на основе проталкивания данных (такой, как Rx) с моделью на основе вытягивания данных (как IEnumerable), а также покажу, для каких сценариев лучше подходит та или иная модель. В конце я покажу применение и продемонстрирую его на примере кода.

Сравнение модели на основе проталкивания данных с моделью на основе вытягивания данных (pull-)

-1- Сравнение модели на основе вытягивания данных с моделью на основе проталкивания данных
Рис.

-1-. Эти примеры основаны на взаимоотношениях поставщика и потребителя данных, как показано на рис. В ней потребитель запрашивает и получает данные от поставщика. Модель на основе вытягивания (pull-) данных проста для понимания. Здесь, поставщик публикует данные в очереди и потребитель должен подписаться на неё, чтобы получить их. Альтернативным подходом является модель на основе проталкивания данных (push-).

Таким образом, потребитель получает только необходимые данные, что позволяет избежать проблем с переполнением. Модель на основе вытягивания данных подходит для тех случаев, где поставщик генерирует данные быстрее, чем потребитель использует их. В этом случае поставщик может отправить потребителю больше данных, чтобы не возникло ненужных задержек. Если потребитель использует данные быстрее, чем поставщик их производит, подойдёт модель на основе проталкивания данных.

Чтобы решить проблемы поставщика и получателя, описанные выше, метод применяет как проталкивание, так и вытягивание данных. Rx и Akka Streams (модель программирования на основе потоков) используют метод обратного давления для управления потоком.

После того как потребитель обработает текущий элемент, он запросит у поставщика следующий и так до конца последовательности. В примере ниже медленный потребитель вытягивает данные со стороны более быстрого поставщика.

Мотивация для использования и основная информация

Чтобы понять всю необходимость асинхронных потоков, рассмотрим следующий код.

// Запускаем цикл и суммируем предложенный аргумент (count)
static int SumFromOneToCount(int count)
{ ConsoleExt.WriteLine("SumFromOneToCount called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum;
} // Вызов метода: const int count = 5;
ConsoleExt.WriteLine($"Starting the application with count: {count}!");
ConsoleExt.WriteLine("Classic sum starting.");
ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}");
ConsoleExt.WriteLine("Classic sum completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

Вывод:

Мы можем сделать метод отложенным, используя оператор yield, как показано ниже.

static IEnumerable<int> SumFromOneToCountYield(int count)
{ ConsoleExt.WriteLine("SumFromOneToCountYield called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; yield return sum; }
}

Вызов метода

const int count = 5;
ConsoleExt.WriteLine("Sum with yield starting.");
foreach (var i in SumFromOneToCountYield(count))
{ ConsoleExt.WriteLine($"Yield sum: {i}");
}
ConsoleExt.WriteLine("Sum with yield completed."); ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

Вывод:

Показанные выше суммарные результаты известны как отложенное перечисление. Как показано выше в окне вывода, результат возвращается частями, а не одним значением. Если посмотреть на потоки, можно увидеть, что всё запущено в основном потоке. Однако, проблема по-прежнему не решена: методы суммирования блокируют код.

Давайте применим волшебное слово async к первому методу SumFromOneToCount (без yield).

static async Task<int> SumFromOneToCountAsync(int count)
{ ConsoleExt.WriteLine("SumFromOneToCountAsync called!"); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; }); return result;
}

Вызов метода

const int count = 5;
ConsoleExt.WriteLine("async example starting.");
// Операция суммирования запущена в асинхронном режиме. Но, этого недостаточно. Нужно, чтобы суммирование шло в асинхронном режиме с задержкой.
var result = await SumFromOneToCountAsync(count);
ConsoleExt.WriteLine("async Result: " + result);
ConsoleExt.WriteLine("async completed."); ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

Вывод:

Теперь вычисления выполняются в другом потоке, однако проблема с результатом всё ещё существует. Отлично. Комбинация называется асинхронные потоки и это новая функция в C# 8. Система так и возвращает результат единым значением.
Представьте, что мы можем совместить отложенные перечисления (оператор yield) и асинхронные методы в императивном стиле программирования. Она великолепно подходит для решения проблем, связанных с моделью программирования на основе вытягивания данных, например скачивания данных с сайта или чтения записей в файле или базе данных современными способами.

Я добавлю ключевое слово async к методу SumFromOneToCountYield следующим образом: Давайте попробуем сделать это в текущей версии C#.

-2- Ошибка при одновременном использовании yield и ключевого слова async.
Рис.

Мы можем убрать ключевое слово yield и применить IEnumerable в задаче, как показано ниже: Когда мы пытаемся добавить async к SumFromOneToCountYield, возникает ошибка, как показано выше.
Давайте попробуем по-другому.

static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count)
{ ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!"); var collection = new Collection<int>(); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; collection.Add(sum); } return collection; }); return result;
}

Вызов метода

const int count = 5;
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!");
var scs = await SumFromOneToCountTaskIEnumerable(count);
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!"); foreach (var sc in scs)
{ // Это не то, что нужно. Мы получили результат единым блоком. ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}");
} ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

Вывод:

Результаты (все результаты собраны в коллекцию) возвращаются в виде одного блока. Как видно из примера, всё вычисляется в асинхронном режиме, но проблема по-прежнему осталась. Если помните, нашей целью было совместить асинхронный режим вычисления с возможностью задержки. А это не то, что нам нужно.

Для этого нужно использовать внешнюю библиотеку, например Ix (часть Rx), или асинхронные потоки, представленные в C#.

Чтобы продемонстрировать асинхронное поведение, я использовал внешнюю библиотеку. Давайте вернёмся к нашему коду.

static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence)
{ ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called"); await sequence.ForEachAsync(value => { ConsoleExt.WriteLineAsync($"Consuming the value: {value}"); // моделируем некоторую задержку Task.Delay(TimeSpan.FromSeconds(1)).Wait(); });
} static IEnumerable<int> ProduceAsyncSumSeqeunc(int count)
{ ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; // моделируем некоторую задержку Task.Delay(TimeSpan.FromSeconds(0,5)).Wait(); yield return sum; }
}

Вызов метода

const int count = 5;
ConsoleExt.WriteLine("Starting Async Streams Demo!"); // Запускаем новую задачу. Она используется для создания асинхронной последовательности данных.
IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable(); ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#"); // Запускаем ещё одну задачу; она потребляет данные из асинхронной последовательности.
var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence)); // Просто для демонстрации. Подождите, пока задача не завершится.
consumingTask.Wait();
ConsoleExt.WriteLineAsync("Async Streams Demo Done!");

Вывод:

Можно выполнить цикл перечисления в асинхронном режиме.
Исходный код см. Наконец, мы видим нужное поведение. здесь.

Вытягивание данных в асинхронном режиме на примере клиент-серверной архитектуры

Все преимущества этой функции лучше всего видны в контексте клиент-серверной архитектуры. Давайте рассмотрим эту концепцию на более реалистичном примере.

Синхронный вызов в случае клиент-серверной архитектуры

е. Отправляя запрос серверу, клиент вынужден ждать (т. -3-. он заблокирован), пока придёт ответ, как показано на рис.

-3- Синхронное вытягивание данных, во время которого клиент ожидает, пока не закончится обработка запроса
Рис.

Асинхронное вытягивание данных

Как только данные получены, клиент продолжит выполнять работу. В этом случае клиент запрашивает данные и переходит к другим задачам.

-4- Асинхронное вытягивание данных, во время которого клиент может выполнять другие задачи, пока данные запрашиваются
Рис.

Вытягивание данных в виде асинхронной последовательности

Затем, получив данные, клиент обрабатывает их и запрашивает следующую часть и так до тех пор, пока не получить все данные. В этом случае клиент запрашивает часть данных и продолжает выполнять другие задачи. На рис. Именно из этого сценария появилась идея асинхронных потоков. -5- показано, как клиент может обрабатывать полученные данные или выполнять другие задачи.

-5- Вытягивание данных в виде асинхронной последовательности (асинхронные потоки).
Рис. Клиент не заблокирован.

Асинхронные потоки

Подобно IEnumerable<T> и IEnumerator<T> существует два новых интерфейса IAsyncEnumerable<T> и IAsyncEnumerator<T>, которые определяются как показано ниже:

public interface IAsyncEnumerable<out T>
{ IAsyncEnumerator<T> GetAsyncEnumerator();
} public interface IAsyncEnumerator<out T> : IAsyncDisposable
{ Task<bool> MoveNextAsync(); T Current { get; }
} //асинхронные потоки также предполагают асинхронное освобождение
public interface IAsyncDisposable
{ Task DiskposeAsync();
}

Здесь я не буду вдаваться в подробности, поэтому рекомендую прочитать его статью. В InfoQ эту тему разобрал Джонатан Аллен.

MoveNext() ). Весь фокус в возвращаемом значении Task<bool> MoveNextAsync() (изменённом с bool на Task<bool>, bool IEnumerator. Потребитель сам решает, когда получить следующее значение. Благодаря ему все вычисления, а также их итерирование, будут происходить асинхронно. Для асинхронной очистки ресурсов можно использовать интерфейс IAsyncDisposable. Несмотря на то что это асинхронная модель, она всё ещё использует вытягивание данных. Более подробную информацию об асинхронных потоках можно прочитать здесь.

Синтаксис

Окончательный вариант синтаксиса должен приблизительно выглядеть, как показано ниже:

foreach await (var dataChunk in asyncStreams)
{ // Обработка части данных с помощью ключевого слова yield или выполнение других задач.
}

Из примера выше понятно, что вместо вычисления одного значения, мы, теоретически, можем последовательно вычислить множество значений, параллельно ожидая другие асинхронные операции.

Переработанный пример Microsoft

Его можно скачать целиком с моего репозитория GitHub. Я переписал демонстрационный код Microsoft.

Во время каждой итерации из массива вытягивается 8 Кб. В основе примера лежит идея создать большой поток в памяти (массив в 20 000 байт) и последовательно извлекать из него элементы в асинхронном режиме.


Затем, во время шага (2) определяется переменная под названием checksum. На шаге (1) создаётся большой массив данных, заполняемый фиктивными значениями. Массив и контрольная сумма создаются в памяти и возвращаются в виде последовательности элементов на шаге (3). Эта переменная, содержащая контрольную сумму, предназначена для проверки корректности суммы вычислений.

Шаг (4) связан с применением метода расширения AsEnumarble (более подходящее название AsAsyncEnumarble), который помогает моделировать асинхронный поток в 8 Кб (BufferSize = 8000 элементов (6))

Наследовать от IAsyncEnumerable обычно не нужно, но в примере, показанном выше, эта операция выполняется, чтобы упростить демонстрационный код, как показано на шаге (5).

Процесс вытягивания происходит последовательно: когда потребитель (часть кода, содержащая foreach) готов получить следующую часть данных, он вытягивает их у поставщика (массив, содержащийся в потоке в памяти). Шаг (7) связан с использованием ключевого слова foreach, которое вытягивает порции данных по 8 Кб из асинхронного потока в памяти. Наконец, когда цикл завершён, программа проверит значение 'c' на соответствие контрольной сумме и, если они совпадают, выведет сообщение "Checksums match!", согласно шагу (8).

Окно вывода из демонстрационного примера Microsoft:

Заключение

Она отличается от модели на основе проталкивания данных IObservable<T>, при использовании которой значения генерируются независимо от состояния потребителя. Мы рассмотрели асинхронные потоки, которые великолепно подходят для асинхронного вытягивания данных и написания кода, генерирующего несколько значений в асинхронном режиме.
Используя эту модель, можно запрашивать следующий элемент данных в последовательности и получать ответ. Примеры включают использование веб-приложений или чтение записей в базе данных. Асинхронные потоки позволяют отлично представлять асинхронные источники данных, управляемые потребителем, когда он сам определяет готовность принять следующую порцию данных.

Также я показал, какие преимущества даёт эта функция при скачивании контента из Интернета. Я продемонстрировал, как создать перечисление в асинхронном режиме и использовать его с помощью внешней библиотеки с асинхронной последовательностью. Наконец, мы рассмотрели новый синтаксис асинхронных потоков, а также полный пример его использования на основе Microsoft Build Demo Code (7–9 мая, 2018// Сиэтл, штат Вашингтон)

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть