Хабрахабр

[Перевод] System.IO.Pipelines: высокоэффективный IO в .NET

System.IO.Pipelines — это новая библиотека, упрощающая организацию кода в .NET. Трудно обеспечить высокую производительность и точность, если приходится иметь дело со сложным кодом. Задача System.IO.Pipelines — упростить код. Подробнее под катом!

NET Core, которые стремились сделать Kestrel одним из самых быстрых веб-серверов в отрасли. Библиотека появилась в результате усилий команды разработчиков . 1 в качестве BCL API первого класса (System. Она изначально задумывалась как часть реализации Kestrel, но превратилась в повторно используемый API, доступный в версии 2. Pipelines). IO.

Какие проблемы она решает?

Чтобы правильно анализировать данные из потока или сокета, требуется написать большой объем стандартного кода. При этом существует множество подводных камней, которые усложняют и сам код, и его поддержку.

Какие сложности возникают сегодня?

Начнем с простой задачи. Нам необходимо написать TCP-сервер, который получает от клиента сообщения с разделителями строк (\n).

Сервер TCP с NetworkStream

ОТСТУПЛЕНИЕ: как и в любой задаче, требующей высокой производительности, каждый конкретный случай стоит рассматривать исходя из особенностей вашего приложения. Возможно, тратить ресурсы на использование различных подходов, о которых пойдет речь далее, не имеет смысла, если масштаб сетевого приложения не очень велик.

NET перед использованием конвейеров выглядит примерно так: Обычный код .

async Task ProcessLinesAsync(NetworkStream stream)

см. sample1.cs на GitHub

Вероятно, этот код будет работать при локальном тестировании, но он имеет ряд ошибок:

  • Возможно, после одного вызова ReadAsync не будет получено целое сообщение (до конца строки).
  • Он игнорирует результат работы метода stream.ReadAsync() — количество данных, фактически переданных в буфер.
  • Код не обрабатывает прием нескольких строк в одном вызове ReadAsync.

Это наиболее распространенные ошибки чтения потоковых данных. Чтобы их избежать, необходимо внести ряд изменений:

  • Нужно буферизовать входящие данные, пока не будет найдена новая строка.
  • Необходимо проанализировать все строки, возвращаемые в буфер.

async Task ProcessLinesAsync(NetworkStream stream) { var buffer = new byte[1024]; var bytesBuffered = 0; var bytesConsumed = 0; while (true) { var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered); if (bytesRead == 0) { // EOF break; } // Keep track of the amount of buffered bytes bytesBuffered += bytesRead; var linePosition = -1; do { // Look for a EOL in the buffered data linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed); if (linePosition >= 0) { // Calculate the length of the line based on the offset var lineLength = linePosition - bytesConsumed; // Process the line ProcessLine(buffer, bytesConsumed, lineLength); // Move the bytesConsumed to skip past the line we consumed (including \n) bytesConsumed += lineLength + 1; } } while (linePosition >= 0); } }

см. sample2.cs на GitHub

Необходимо увеличить размер входного буфера до тех пор, пока не будет найдена новая строка. Повторюсь: это могло бы сработать при локальном тестировании, но иногда встречаются строки длиной больше 1 Кб (1024 байта).

Мы можем улучшить этот процесс с помощью ArrayPool, что позволит избежать повторного распределения буферов во время анализа длинных строк, поступающих от клиента. Кроме того, мы собираем буферы в массив при обработке длинных строк.

async Task ProcessLinesAsync(NetworkStream stream) { byte[] buffer = ArrayPool<byte>.Shared.Rent(1024); var bytesBuffered = 0; var bytesConsumed = 0; while (true) { // Calculate the amount of bytes remaining in the buffer var bytesRemaining = buffer.Length - bytesBuffered; if (bytesRemaining == 0) { // Double the buffer size and copy the previously buffered data into the new buffer var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2); Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length); // Return the old buffer to the pool ArrayPool<byte>.Shared.Return(buffer); buffer = newBuffer; bytesRemaining = buffer.Length - bytesBuffered; } var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining); if (bytesRead == 0) { // EOF break; } // Keep track of the amount of buffered bytes bytesBuffered += bytesRead; do { // Look for a EOL in the buffered data linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed); if (linePosition >= 0) { // Calculate the length of the line based on the offset var lineLength = linePosition - bytesConsumed; // Process the line ProcessLine(buffer, bytesConsumed, lineLength); // Move the bytesConsumed to skip past the line we consumed (including \n) bytesConsumed += lineLength + 1; } } while (linePosition >= 0); } }

см. sample3.cs на GitHub

Также используется больше памяти, поскольку логика не сокращает буфер после обработки строк. Код работает, но теперь изменился размер буфера, в результате появляется множество его копий. Чтобы этого избежать, можно сохранять список буферов, а не менять каждый раз размер буфера при поступлении строк длиннее 1 Кб.

Это значит, что мы будем передавать в ReadAsync буферы все меньшего размера, в результате возрастет число вызовов операционной системы. Кроме того, мы не увеличиваем буфер размером 1 Кб, пока он полностью не опустеет.

Мы постараемся исключить это и будем выделять новый буфер, как только размер существующего станет меньше 512 байт:

public class BufferSegment { public byte[] Buffer { get; set; } public int Count { get; set; } public int Remaining => Buffer.Length - Count; } async Task ProcessLinesAsync(NetworkStream stream) { const int minimumBufferSize = 512; var segments = new List<BufferSegment>(); var bytesConsumed = 0; var bytesConsumedBufferIndex = 0; var segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) }; segments.Add(segment); while (true) { // Calculate the amount of bytes remaining in the buffer if (segment.Remaining < minimumBufferSize) { // Allocate a new segment segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) }; segments.Add(segment); } var bytesRead = await stream.ReadAsync(segment.Buffer, segment.Count, segment.Remaining); if (bytesRead == 0) { break; } // Keep track of the amount of buffered bytes segment.Count += bytesRead; while (true) { // Look for a EOL in the list of segments var (segmentIndex, segmentOffset) = IndexOf(segments, (byte)'\n', bytesConsumedBufferIndex, bytesConsumed); if (segmentIndex >= 0) { // Process the line ProcessLine(segments, segmentIndex, segmentOffset); bytesConsumedBufferIndex = segmentOffset; bytesConsumed = segmentOffset + 1; } else { break; } } // Drop fully consumed segments from the list so we don't look at them again for (var i = bytesConsumedBufferIndex; i >= 0; --i) { var consumedSegment = segments[i]; // Return all segments unless this is the current segment if (consumedSegment != segment) { ArrayPool<byte>.Shared.Return(consumedSegment.Buffer); segments.RemoveAt(i); } } } } (int segmentIndex, int segmentOffest) IndexOf(List<BufferSegment> segments, byte value, int startBufferIndex, int startSegmentOffset) { var first = true; for (var i = startBufferIndex; i < segments.Count; ++i) { var segment = segments[i]; // Start from the correct offset var offset = first ? startSegmentOffset : 0; var index = Array.IndexOf(segment.Buffer, value, offset, segment.Count - offset); if (index >= 0) { // Return the buffer index and the index within that segment where EOL was found return (i, index); } first = false; } return (-1, -1); }

см. sample4.cs на GitHub

Во время поиска разделителя мы отслеживаем заполненные буферы. В итоге код существенно усложняется. В результате ProcessLine и IndexOf будут принимать List вместо byte[], offset и count. Для этого используется List, который отображает буферизованные данные при поиске нового разделителя строк. Логика синтаксического анализа начнет обрабатывать один сегмент буфера или несколько.

Однако нужно сделать еще ряд изменений: И теперь сервер будет обрабатывать частичные сообщения и использовать объединенную память, чтобы уменьшить общее потребление памяти.

  1. Из ArrayPoolbyte мы используем только Byte[] — стандартно управляемые массивы. Иными словами, при выполнении функции ReadAsync или WriteAsync срок действия буферов привязывается ко времени осуществления асинхронной операции (чтобы взаимодействовать с собственными API ввода-вывода операционной системы). Поскольку закрепленная память не может перемещаться, это сказывается на производительности сборщика мусора и может вызвать фрагментацию массива. Возможно, реализацию пула придется изменить, в зависимости от того, как долго асинхронные операции будут ожидать исполнения.
  2. Пропускную способность можно улучшить, если разорвать связь между логикой чтения и обработки. Мы получаем эффект пакетной обработки, и теперь логика синтаксического анализа сможет считывать большие объемы данных, обрабатывая большие блоки буферов, а не анализируя отдельные строки. В результате код усложняется еще больше:
    • Необходимо создать два цикла, работающих независимо друг от друга. Первый будет считывать данные из сокета, а второй — анализировать буферы.
    • Нужен способ сообщать логике синтаксического анализа, что данные становятся доступны.
    • Также необходимо определить, что произойдет, если цикл будет считывать данные из сокета слишком быстро. Нам нужен способ регулировать цикл считывания, если логика синтаксического анализа не поспевает за ним. Обычно это называют «управлением потоком» или «сопротивлением потоку».
    • Мы должны убедиться, что данные передаются безопасно. Теперь набор буферов используется и циклом считывания, и циклом синтаксического анализа, они работают независимо друг от друга на разных потоках.
    • Логика управления памятью также задействуется двумя разными фрагментами кода: заимствующим данные из буферного пула, который считывает данные из сокета, и возвращающим из буферного пула, который является логикой синтаксического анализа.
    • Нужно быть предельно осторожными с возвратом буферов после исполнения логики синтаксического анализа. Иначе есть вероятность того, что мы вернем буфер, в который все еще ведется запись логики чтения сокета.

Сложность начинает зашкаливать (а это далеко не все случаи!). Для создания высокопроизводительной сети нужно написать очень сложный код.

IO. Цель System. Pipelines — упростить эту процедуру.

TCP-сервер и System.IO.Pipelines

Давайте посмотрим, как работает System.IO.Pipelines:

async Task ProcessLinesAsync(Socket socket) { var pipe = new Pipe(); Task writing = FillPipeAsync(socket, pipe.Writer); Task reading = ReadPipeAsync(pipe.Reader); return Task.WhenAll(reading, writing); } async Task FillPipeAsync(Socket socket, PipeWriter writer) { const int minimumBufferSize = 512; while (true) { // Allocate at least 512 bytes from the PipeWriter Memory<byte> memory = writer.GetMemory(minimumBufferSize); try { int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None); if (bytesRead == 0) { break; } // Tell the PipeWriter how much was read from the Socket writer.Advance(bytesRead); } catch (Exception ex) { LogError(ex); break; } // Make the data available to the PipeReader FlushResult result = await writer.FlushAsync(); if (result.IsCompleted) { break; } } // Tell the PipeReader that there's no more data coming writer.Complete(); } async Task ReadPipeAsync(PipeReader reader) { while (true) { ReadResult result = await reader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer; SequencePosition? position = null; do { // Look for a EOL in the buffer position = buffer.PositionOf((byte)'\n'); if (position != null) { // Process the line ProcessLine(buffer.Slice(0, position.Value)); // Skip the line + the \n character (basically position) buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); } } while (position != null); // Tell the PipeReader how much of the buffer we have consumed reader.AdvanceTo(buffer.Start, buffer.End); // Stop reading if there's no more data coming if (result.IsCompleted) { break; } } // Mark the PipeReader as complete reader.Complete(); }

см. sample5.cs на GitHub

В конвейерной версии нашего считывателя строк есть два цикла:

  • FillPipeAsync считывает из сокета и записывает в PipeWriter.
  • ReadPipeAsync считывает из PipeReader и анализирует входящие строки.

В отличие от первых примеров, здесь нет специально назначенных буферов. Это одна из основных функций System.IO.Pipelines. Все задачи по управлению буферами передаются реализациям PipeReader/PipeWriter.

Процедура упрощается: мы используем код только для бизнес-логики вместо того, чтобы реализовывать сложное управление буферами.

GetMemory(int), чтобы получить определенный объем памяти от основного записывателя. В первом цикле сначала вызывается PipeWriter. Advance(int), который сообщает PipeWriter, сколько данных фактически записано в буфер. Затем вызывается PipeWriter. FlushAsync(), чтобы PipeReader получил доступ к данным. После этого следует вызов PipeWriter.

Когда возвращается запрос к PipeReader. Второй цикл потребляет буферы, которые были записаны PipeWriter, но изначально поступили от сокета. Когда будет найден разделитель конца строки (EOL) и проанализирована строка, мы разделим буфер на части, чтобы пропустить уже обработанный фрагмент. ReadAsync(), мы получаем ReadResult, содержащий два важных сообщения: данные, считанные в форме ReadOnlySequence, а также логический тип данных IsCompleted, который сообщает считывателю, закончил ли записыватель работу (EOF). AdvanceTo, и он сообщает PipeReader, сколько данных было потреблено. После этого вызывается PipeReader.

В результате основной канал высвобождает всю выделенную память. В конце каждого цикла завершается работа и считывателя, и записывателя.

System.IO.Pipelines

Частичное чтение

Кроме управления памятью System.IO.Pipelines выполняет другую важную функцию: просматривает данные в канале, но не потребляет их.

ReadAsync получает данные из канала, AdvanceTo сообщает PipeReader о том, что эти буферы больше не требуются считывателю, поэтому от них можно избавиться (например, вернуть в основной буферный пул). У PipeReader есть два основных API: ReadAsync и AdvanceTo.

Ниже приведен пример анализатора HTTP, который считывает данные из буферов частичных данных канала, пока не получит подходящую начальную строку.

ReadOnlySequenceT

Реализация канала хранит список связанных буферов, передающихся между PipeWriter и PipeReader. PipeReader.ReadAsync раскрывает ReadOnlySequence, являющийся новым типом BCL и состоящий из одного или нескольких сегментов ReadOnlyMemory. Он похож на Span или Memory, что дает нам возможность взглянуть на массивы и строки.

SequencePosition представляет собой единую точку в связанном списке буферов и используется для эффективного разделения ReadOnlySequence<Т>. Внутри канала есть указатели, которые показывают, где в общем наборе выделенных данных располагаются считыватель и записыватель, а также обновляют их по мере записи и чтения данных.

Поскольку ReadOnlySequence<Т> поддерживает один сегмент и более, то стандартной операцией высокопроизводительной логики является разделение быстрых и медленных путей исходя из количества сегментов.

В качестве примера приведем функцию, преобразующую ASCII ReadOnlySequence в строку:

string GetAsciiString(ReadOnlySequence<byte> buffer) { if (buffer.IsSingleSegment) { return Encoding.ASCII.GetString(buffer.First.Span); } return string.Create((int)buffer.Length, buffer, (span, sequence) => { foreach (var segment in sequence) { Encoding.ASCII.GetChars(segment.Span, span); span = span.Slice(segment.Length); } }); }

см. sample6.cs на GitHub

Сопротивление потоку и управление потоком

В идеале чтение и анализ работают совместно: поток чтения потребляет данные из сети и помещает их в буферы, в то время как поток анализа создает подходящие структуры данных. Обычно анализ занимает больше времени, чем простое копирование блоков данных из сети. В результате поток чтения может с легкостью перегрузить поток анализа. Поэтому поток чтения будет вынужден либо замедлить работу, либо потреблять больше памяти, чтобы сохранять данные для потока анализа. Чтобы обеспечить оптимальную производительность, необходим баланс между частотой пауз и выделением большого объема памяти.

PauseWriterThreshold определяет, сколько данных необходимо буферизовать до приостановки PipeWriter. Для решения этой проблемы конвейер имеет две функции управления потоком данных: PauseWriterThreshold и ResumeWriterThreshold. ResumeWriterThreshold определяет, сколько памяти может потребить считыватель до возобновления работы записывателя. FlushAsync.

FlushAsync «блокируется», когда количество данных в конвейерном потоке превысит лимит, установленный в PauseWriterThreshold, и «разблокируется», когда оно станет ниже установленного в ResumeWriterThreshold. PipeWriter. Чтобы предотвратить превышение лимита потребления, используются всего два значения.

Планирование ввода-вывода

При использовании async/await последующие операции обычно вызываются либо в потоках пула, либо в текущем SynchronizationContext.

Это имеет критическое значение для высокопроизводительных приложений, таких как веб-серверы. При осуществлении ввода-вывода очень важно тщательно контролировать, где он выполняется, чтобы эффективнее использовать кэш процессора. IO. System. Это позволяет очень точно контролировать, какие потоки использовать для ввода-вывода. Pipelines использует PipeScheduler, чтобы определить место выполнения асинхронных ответных вызовов.

Пример практического применения — транспорт Kestrel Libuv, в котором обратные вызовы ввода-вывода выполняются по выделенным каналам цикла событий.

Есть и другие преимущества шаблона PipeReader

  • Некоторые базовые системы поддерживают «ожидание без буферизации»: буфер не нужно выделять то тех пор, пока в базовой системе не появятся доступные данные. Так, в Linux с epoll можно не предоставлять буфер для считывания до тех пор, пока данные не будут подготовлены. Это позволяет избежать ситуации, когда имеется множество потоков, ожидающих данные, и требуется сразу же резервировать огромный объем памяти.
  • Конвейер по умолчанию упрощает запись модульных тестов сетевого кода: логика синтаксического анализа отделена от сетевого кода, и модульные тесты запускают эту логику только в буферах в памяти, а не потребляют ее непосредственно из сети. Он также упрощает тестирование сложных шаблонов с отправкой частичных данных. ASP.NET Core использует его для проверки различных аспектов http-средств синтаксического анализа Kestrel.
  • Системы, позволяющие пользовательскому коду задействовать основные буферы ОС (например, зарегистрированные API ввода-вывода Windows), изначально подходят для использования конвейеров, поскольку реализация PipeReader всегда предоставляет буферы.

Другие связанные типы

Мы также добавили в System.IO.Pipelines ряд новых простых типов BCL:

  • MemoryPoolT, IMemoryOwnerT, MemoryManagerT. В .NET Core 1.0 был добавлен ArrayPoolT, а в .NET Core 2.1 теперь имеется более общее абстрактное представление для пула, который работает с любыми MemoryT. Мы получаем точку расширяемости, позволяющую осуществлять более продвинутые стратегии распределения, а также контролировать управление буферами (например, использовать предустановленные буферы вместо исключительно управляемых массивов).
  • IBufferWriterT представляет собой приемник для записи синхронных буферизованных данных (реализуется PipeWriter).
  • IValueTaskSource — ValueTaskT существует со времени выпуска .NET Core 1.1, но в .NET Core 2.1 приобрел чрезвычайно эффективные инструменты, обеспечивающие бесперебойные асинхронные операции без распределения. Дополнительную информацию см. здесь.

Как использовать конвейеры?

API находятся в nuget-пакете System.IO.Pipelines.

NET Server 2. Пример приложения сервера . здесь. 1, использующего конвейеры для обработки строчных сообщений (из примера выше) см. В примере ожидается передача данных от сокета на порту 8087, затем полученные сообщения записываются на консоль. Его можно запустить с помощью dotnet run (или Visual Studio). Отправьте строчное сообщение и посмотрите, как это работает. Для подключения к порту 8087 можно использовать клиент, например netcat или putty.

NET в будущем. На данный момент конвейер работает в Kestrel и SignalR, и мы надеемся, что она найдет более широкое применение во множестве сетевых библиотек и компонентов сообщества .

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть