Хабрахабр

[Из песочницы] Использование Boost.Asio с Coroutines TS

Использование функций обратного вызова (callback) — популярный подход к построению сетевых приложений с использованием библиотеки Boost.Asio (и не только ее). Проблемой этого подхода является ухудшение читабельности и поддерживаемости кода при усложнении логики протокола обмена данными [1].

Как альтернатива коллбекам, сопрограммы (coroutines) можно применить для написания асинхронного кода, уровень читабельности которого будет близок к читабельности синхронного кода. Boost.Asio поддерживает такой подход, предоставляя возможность использования библиотеки Boost.Coroutine для обработки коллбеков.

Boost.Coroutine реализует сопрограммы с помощью сохранения контекста выполнения текущего потока. Этот подход конкурировал за включение в следующую редакцию стандарта C++ с предложением от Microsoft, которое вводит новые ключевые слова co_return, co_yield и co_await. Предложение Microsoft получило статус Technical Specification (TS) [2] и имеет высокие шансы стать стандартом.

Статья [3] демонстрирует использование Boost.Asio с Coroutines TS и boost::future. В своей статье я хочу показать, как можно обойтись без boost::future. Мы возьмем за основу пример асинхронного TCP эхо-сервера из Boost.Asio и будем его модифицировать, используя сопрограммы из Coroutines TS.

На момент написания статьи Coroutines TS реализована в компиляторах Visual C++ 2017 и clang 5.0. Мы будем использовать clang. Необходимо установить флаги компилятора для включения экспериментальной поддержки стандарта C++ 20 (-std=c++2a) и Coroutines TS (-fcoroutines-ts). Также нужно включить заголовочный файл <experimental/coroutine>.

В оригинальном примере функция для чтения из сокета выглядит так:

void do_read() { auto self(shared_from_this()); socket_.async_read_some( boost::asio::buffer(data_, max_length), [this, self](boost::system::error_code ec, std::size_t length) { if (!ec) { do_write(length); } });
}

Мы инициируем асинхронное чтение из сокета и задаем коллбек, который будет вызван при получении данных и инициирует их отсылку обратно. Функция записи в оригинале выглядит так:

void do_write(std::size_t length) { auto self(shared_from_this()); boost::asio::async_write( socket_, boost::asio::buffer(data_, length), [this, self](boost::system::error_code ec, std::size_t /*length*/) { if (!ec) { do_read(); } });
}

При успешной записи данных в сокет мы снова инициируем асинхронное чтение. По сути, логика программы сводится к циклу (псевдокод):

while (!ec)
{ ec = read(buffer); if (!ec) { ec = write(buffer); }
}

Было бы удобно закодировать это в виде явного цикла, однако в таком случае нам пришлось бы сделать чтение и запись синхронными операциями. Нам это не подходит, поскольку мы хотим обслуживать несколько клиентских сессий в одном потоке выполнения одновременно. На помощь приходят сопрограммы. Перепишем функцию do_read() в следующем виде:

void do_read() { auto self(shared_from_this()); const auto[ec, length] = co_await async_read_some( socket_, boost::asio::buffer(data_, max_length)); if (!ec) { do_write(length); }
}

Использование ключевого слова co_await (а также co_yield и co_return) превращает функцию в сопрограмму. Такая функция имеет несколько точек (suspension point), где ее выполнение приостанавливается (suspend) с сохранением состояния (значений локальных переменных). Позже выполнение сопрограммы может быть возобновлено (resume), начиная с последней остановки. Ключевое слово co_await в нашей функции создает suspension point: после того, как асинхронное чтение инициировано, выполнение сопрограммы do_read() будет приостановлено до завершения чтения. Возврата из функции при этом не происходит, но выполнение программы продолжается, начиная с точки вызова сопрограммы. Когда клиент подключается, вызывается session::start(), где do_read() вызывается первый раз для этой сессии. После начала асинхронного чтения продолжается выполнение функции start(), происходит возврат из нее и инициируется прием следующего соединения. Далее продолжает выполнение код из Asio, который вызвал обработчик-аргумент async_accept().

Для того, чтобы магия co_await работала, его выражение — в нашем случае функция async_read_some() — должен возвращать объект класса, который соответствует определенному контракту. Реализация async_read_some() взята из комментария к статье [3].

template <typename SyncReadStream, typename DynamicBuffer>
auto async_read_some(SyncReadStream &s, DynamicBuffer &&buffers) { struct Awaiter { SyncReadStream &s; DynamicBuffer &&buffers; std::error_code ec; size_t sz; bool await_ready() { return false; } void await_suspend(std::experimental::coroutine_handle<> coro) { s.async_read_some(std::move(buffers), [this, coro](auto ec, auto sz) mutable { this->ec = ec; this->sz = sz; coro.resume(); }); } auto await_resume() { return std::make_pair(ec, sz); } }; return Awaiter{s, std::forward<DynamicBuffer>(buffers)};
}

async_read_some() возвращает объект класса Awaiter, который реализует контракт, требуемый co_await:

  • await_ready() вызывается в начале ожидания для проверки, готов ли уже результат асинхронной операции. Поскольку для получения результата нам всегда нужно дождаться, пока данные будут прочитаны, мы возвращаем false.
  • await_suspend() вызывается перед тем, как вызывающая сопрограмма будет приостановлена. Здесь мы инициируем асинхронное чтение и передаем обработчик, который сохранит результаты выполнения асинхронной операции в переменных-членах класса Awaiter и возобновит сопрограмму.
  • await_resume() — возвращаемое значение этой функции будет результатом выполнения co_await. Просто возвращаем сохраненные ранее результаты выполнения асинхронной операции.

Если теперь попытаться собрать нашу программу, то получим ошибку компиляции:

error: this function cannot be a coroutine: 'std::experimental::coroutines_v1::coroutine_traits<void, session &>' has no member named 'promise_type' void do_read() { ^

Причина в том, что компилятор требует, чтобы для сопрограммы тоже был реализован некоторый контракт. Это делается с помощью специализации шаблона std::experimental::coroutine_traits:

template <typename... Args>
struct std::experimental::coroutine_traits<void, Args...> { struct promise_type { void get_return_object() {} std::experimental::suspend_never initial_suspend() { return {}; } std::experimental::suspend_never final_suspend() { return {}; } void return_void() {} void unhandled_exception() { std::terminate(); } };
};

Мы специализировали coroutine_traits для сопрограмм с возращаемым значением типа void и любым количеством и типами параметров. Сопрограмма do_read() подходит под это описание. Специализация шаблона содержит тип promise_type с следующими функциями:

  • get_return_object() вызывается для создания объекта, который сопрограмма будет впоследствии заполнять и возвращать. В нашем случае ничего создавать не нужно, так как do_read() ничего не возвращает.
  • initial_suspend() определяет, будет ли сопрограмма приостановлена перед первым вызовом. Аналогия — запуск приостановленного потока в Windows. Нам нужно, чтобы do_read() выполнялась без начальной остановки, поэтому возвращаем suspend_never.
  • final_suspend() определяет, будет ли сопрограмма приостановлена перед возвратом значения и завершением. Возвращаем suspend_never.
  • return_void() указывает компилятору, что сопрограмма ничего не возвращает.
  • unhandled_exception() вызывается, если внутри сопрограммы было сгенерировано исключение, и оно не было обработано внутри сопрограммы. В этом случае аварийно завершаем программу.

Теперь можно запустить сервер и проверить его работоспособность, открыв несколько подключений с помощью telnet.

Функция записи do_write() все еще основывается на использовании коллбека. Исправим это. Перепишем do_write() в следующем виде:

auto do_write(std::size_t length) { auto self(shared_from_this()); struct Awaiter { std::shared_ptr<session> ssn; std::size_t length; std::error_code ec; bool await_ready() { return false; } auto await_resume() { return ec; } void await_suspend(std::experimental::coroutine_handle<> coro) { const auto[ec, sz] = co_await async_write( ssn->socket_, boost::asio::buffer(ssn->data_, length)); this->ec = ec; coro.resume(); } }; return Awaiter{self, length};
}

Напишем awaitable-обертку для записи в сокет:

template <typename SyncReadStream, typename DynamicBuffer>
auto async_write(SyncReadStream &s, DynamicBuffer &&buffers) { struct Awaiter { SyncReadStream &s; DynamicBuffer &&buffers; std::error_code ec; size_t sz; bool await_ready() { return false; } auto await_resume() { return std::make_pair(ec, sz); } void await_suspend(std::experimental::coroutine_handle<> coro) { boost::asio::async_write( s, std::move(buffers), [this, coro](auto ec, auto sz) mutable { this->ec = ec; this->sz = sz; coro.resume(); }); } }; return Awaiter{s, std::forward<DynamicBuffer>(buffers)};
}

Последний шаг — перепишем do_read() в виде явного цикла:

void do_read() { auto self(shared_from_this()); while (true) { const auto[ec, sz] = co_await async_read_some( socket_, boost::asio::buffer(data_, max_length)); if (!ec) { auto ec = co_await do_write(sz); if (ec) { std::cout << "Error writing to socket: " << ec << std::endl; break; } } else { std::cout << "Error reading from socket: " << ec << std::endl; break; } }
}

Логика программы теперь записана в виде, близком к синхронному коду, однако она выполняется асинхронно. Ложкой дёгтя является то, что нам пришлось написать дополнительный awaitable-класс для возвращаемого значения do_write(). Это иллюстрирует один из недостатков Coroutines TS — распространение co_await вверх по стеку асинхронных вызовов [4].

Переделку функции server::do_accept() в сопрограмму оставим в качестве упражнения. Полный текст программы можно найти на GitHub.

Мы рассмотрели использование Boost.Asio с Coroutines TS для программирования асинхронных сетевых приложений. Преимущество такого подхода — улучшение читабельности кода, поскольку он становится близок по форме к синхронному. Недостаток — необходимость в написании дополнительных оберток для поддержки модели сопрограмм, реализованной в Coroutines TS.

  1. Асинхронность: назад в будущее
  2. Working Draft, Technical Specification for C++ Extensions for Coroutines
  3. Using C++ Coroutines with Boost C++ Libraries
  4. Возражения против принятия Coroutines с await в C++17
Показать больше

Похожие публикации

Кнопка «Наверх»