Хабрахабр

RESTinio — это асинхронный HTTP-сервер. Простой пример из практики: отдача большого объема данных в ответ

Например, подключаясь к одному URL приложение должно было ограничить себя, скажем, 200KiB/sec. Недавно мне довелось поработать над приложением, которое должно было контролировать скорость своих исходящих подключений. А подключаясь к другому URL — всего 30KiB/sec.

Мне потребовался HTTP-сервер, который бы отдавал трафик с какой-то заданной скоростью, например, 512KiB/sec. Самым интересным моментом здесь оказалось тестирование этих самых ограничений. Тогда бы я мог видеть, действительно ли приложение выдерживает скорость 200KiB/sec или же оно срывается на более высокие скорости.

Но где взять такой HTTP-сервер?

Поскольку я имею некоторое отношение к встраиваемому в С++ приложения HTTP-серверу RESTinio, то не придумал ничего лучше, чем быстренько набросать на коленке простой тестовый HTTP-сервер, который способен отдавать клиенту длинный поток исходящих данных.

Заодно узнать в комментариях, действительно ли это просто или же я сам себя обманываю. О том, насколько это было просто и хотелось бы рассказать в статье. Асинхронный". В принципе, данную статью можно рассматривать как продолжение предыдущей статьи про RESTinio под названием "RESTinio — это асинхронный HTTP-сервер. Посему, если кому-то интересно прочитать о реальном, пусть и не очень серьезном применении RESTinio, то милости прошу под кат.

Когда таймер срабатывает, то клиенту отсылается очередной блок данных заданного размера. Общая идея упомянутого выше тестового сервера очень проста: когда клиент подключается к серверу и выполняет HTTP GET запрос, то взводится таймер, срабатывающий раз в секунду.

Но все несколько сложнее

Поскольку данные начнут скапливаться в сокете и ни к чему хорошему это не приведет. Если клиент вычитывает данные с меньшим темпом, нежели отсылает сервер, то просто отсылать по N килобайт раз в секунду не есть хорошая идея.

Пока сокет готов (т.е. Поэтому при отсылке данных желательно на стороне HTTP-сервера контролировать готовность сокета к записи. А вот если не готов, то нужно подождать пока сокет не перейдет в состояние готовности к записи. в нем не скопилось еще слишком много данных), то новую порцию отсылать можно.

Звучит разумно, но ведь операции ввода-вывода скрыты в потрохах RESTinio… Как тут узнать, можно ли записывать следующую порцию данных или нет?

Например, мы можем написать так: Из данной ситуации можно выйти, если использовать after-write нотификаторы, которые есть в RESTinio.

void request_handler(restinio::request_handle_t req) );
}

Соответственно, если сокет какое-то время был не готов к записи, то лямбда будет вызвана не сразу, а после того, как сокет придет в должное состояние и примет все исходящие данные. Лямбда, переданная в метод done() будет вызвана когда RESTinio завершит запись исходящих данных.

За счет использования after-write нотификаторов логика работы тестового сервера будет такой:

  • отсылаем очередную порцию данных, вычисляем время, когда нам нужно было бы отослать следующую порцию при нормальном развитии событий;
  • вешаем after-write нотификатор на очередную порцию данных;
  • когда after-write нотификатор вызывается, мы проверяем, наступило ли время отсылки следующей порции. Если наступило, то сразу же инициируем отсылку следующей порции. Если не наступило, то взводим таймер.

И возобновиться когда сокет будет готов принимать новые исходящие данные. В результате получится, что как только запись начнет притормаживать, отсылка новых данных приостановится.

И еще немного сложного: chunked_output

Самый простой способ, который применяется по умолчанию, в данном случае не подходит, т.к. RESTinio поддерживает три способа формирования ответа на HTTP-запрос. И такой поток, естественно, нельзя отдать в единственный вызов метода set_body. мне требуется практически бесконечный поток исходящих данных.

chunked_output. Поэтому в описываемом тестовом сервере используется т.н. при создании ответа я указываю RESTinio, что ответ будет формироваться частями. Т.е. После чего просто периодически вызываю методы append_chunk для добавления к ответу очередной части и flush для записи накопленных частей в сокет.

Начнем с функции request_processor, которая вызывается для обработки каждого корректного HTTP-запроса. Пожалуй, достаточно уже вступительных слов и пора перейти к рассмотрению самого кода, который можно найти в этом репозитории. Ну а затем уже посмотрим, как именно request_processor ставится в соответствие тому или иному входящему HTTP-запросу. При этом углубимся в те функции, которые из request_processor вызываются.

Функция request_processor и её подручные

Ей в качестве аргументов передаются: Функция request_processor вызывается для обработки нужных мне HTTP GET запросов.

  • Asio-шный io_context, на котором ведется вся работа (он потребуется, например, для взведения таймеров);
  • размер одной части ответа. Т.е. если мне нужно отдавать исходящий поток с темпом в 512KiB/sec, то в качестве этого параметра будет передано значение 512KiB;
  • количество частей в ответе. На случай, если поток должен иметь какую-то ограниченную длину. Например, если нужно отдавать поток с темпом 512KiB/sec в течении 5 минут, то в качестве этого параметра будет передано значение 300 (60 блоков в минуту в течении 5 минут);
  • ну и сам входящий запрос для обработки.

Внутри request_processor создается объект с информацией о запросе и параметрах его обработки, после чего эта самая обработка и начинается:

void request_processor( asio_ns::io_context & ctx, std::size_t chunk_size, std::size_t count, restinio::request_handle_t req) { auto data = std::make_shared<response_data>( ctx, chunk_size, req->create_response<output_t>(), count); data->response_ .append_header(restinio::http_field::server, "RESTinio") .append_header_date_field() .append_header( restinio::http_field::content_type, "text/plain; charset=utf-8") .flush(); send_next_portion(data);
}

Тип response_data, содержащий все относящиеся к запросу параметры, выглядит следующим образом:

struct response_data { asio_ns::io_context & io_ctx_; std::size_t chunk_size_; response_t response_; std::size_t counter_; response_data( asio_ns::io_context & io_ctx, std::size_t chunk_size, response_t response, std::size_t counter) : io_ctx_{io_ctx} , chunk_size_{chunk_size} , response_{std::move(response)} , counter_{counter} {}
};

Поэтому этот объект нельзя просто так захватить в лямбда-функции, которая затем оборачивается в std::function. Тут нужно заметить, что одна из причин появления структуры response_data состоит в том, что объект типа restinio::response_builder_t<restinio::chunked_output_t> (а именно этот тип спрятан за коротким псевдонимом response_t) является moveable-, но не copyable-типом (по аналогии с std::unique_ptr). Но если объект-response поместить в динамически созданный экземпляр response_data, то умный указатель на экземпляр reponse_data уже можно без проблем захватывать в лямбда-функции с последующим сохранением этой лямбды в std::function.

Функция send_next_portion

Ничего сложного в ней не происходит, поэтому выглядит она достаточно просто и лаконично: Функция send_next_portion вызывается каждый раз, когда требуется отослать клиенту очередную часть ответа.

void send_next_portion(response_data_shptr data) { data->response_.append_chunk(make_buffer(data->chunk_size_)); if(1u == data->counter_) { data->response_.flush(); data->response_.done(); } else { data->counter_ -= 1u; data->response_.flush(make_done_handler(data)); }
}

отсылаем очередную часть. Т.е. А если не последняя, то в метод flush передается after-write нотификатор, который создается, пожалуй, наиболее сложной функцией данного примера. И, если эта часть была последней, то завершаем обработку запроса.

Функция make_done_handler

Этот нотификатор должен проверить, завершилась ли запись очередной части ответа успешно. Функция make_done_handler отвечает за создание лямбды, которая будет передана в RESTinio в качестве after-write нотификатора. были "тормоза" в сокете и темп отсылки выдерживать не получается), либо же после некоторой паузы. Если да, то нужно разобраться, следует ли следующую часть отослать сразу же (т.е. Если нужна пауза, то она обеспечивается через взведение таймера.

Которому не так уж и мало лет чтобы называться современным 😉 В общем-то, несложные действия, но в коде получается лямбда внутри лямбды, что может смутить людей, не привыкших к "современному" С++.

auto make_done_handler(response_data_shptr data) { const auto next_timepoint = steady_clock::now() + 1s; return [=](const auto & ec) { if(!ec) { const auto now = steady_clock::now(); if(now < next_timepoint) { auto timer = std::make_shared<asio_ns::steady_timer>(data->io_ctx_); timer->expires_after(next_timepoint - now); timer->async_wait([timer, data](const auto & ec) { if(!ec) send_next_portion(data); }); } else data->io_ctx_.post([data] { send_next_portion(data); }); } };
}

По-моему, получается как-то слишком уж многословно. На мой взгляд, основная сложность в этом коде проистекает из-за особенностей создания и "взвода" таймеров в Asio. Зато не нужно никаких дополнительных библиотек привлекать. Но тут уж что есть, то есть.

Подключение express-like роутера

Показанные выше request_processor, send_next_portion и make_done_handler в общем-то и составляли самую первую версию моего тестового сервера, написанного буквально за 15 или 20 минут.

Скомпилировал со скоростью 512KiB/sec — отдает всем 512KiB/sec. Но через пару дней использования этого тестового сервера оказалось, что в нем есть серьезный недостаток: он всегда отдает ответный поток с одинаковой скоростью. Что было неудобно, т.к. Перекомпилировал со скоростью 20KiB/sec — будет отдавать всем 20KiB/sec и никак иначе. стало нужно иметь возможность получать ответы разной "толщины".

Например, сделали запрос на localhost:8080/ и получили ответ с заранее заданной скоростью. Тогда и появилась идея: а что, если скорость отдачи будет запрашиваться прямо в URL? А если сделали запрос на localhost:8080/128K, то стали получать ответ со скоростью 128KiB/sec.

Т.е. Потом мысль пошла еще дальше: в URL также можно задавать и количество отдельных частей в ответе. запрос localhost:8080/128K/3000 приведет к выдаче потока из 3000 частей со скоростью 128KiB/sec.

В RESTinio есть возможность использовать маршрутизатор запросов, сделанный под влиянием ExpressJS. Нет проблем. В итоге появилась вот такая функция описания обработчиков входящих HTTP-запросов:

auto make_router(asio_ns::io_context & ctx) { auto router = std::make_unique<router_t>(); router->http_get("/", [&ctx](auto req, auto) { request_processor(ctx, 100u*1024u, 10000u, std::move(req)); return restinio::request_accepted(); }); router->http_get( R"(/:value(\d+):multiplier([MmKkBb]?))", [&ctx](auto req, auto params) { const auto chunk_size = extract_chunk_size(params); if(0u != chunk_size) { request_processor(ctx, chunk_size, 10000u, std::move(req)); return restinio::request_accepted(); } else return restinio::request_rejected(); }); router->http_get( R"(/:value(\d+):multiplier([MmKkBb]?)/:count(\d+))", [&ctx](auto req, auto params) { const auto chunk_size = extract_chunk_size(params); const auto count = restinio::cast_to<std::size_t>(params["count"]); if(0u != chunk_size && 0u != count) { request_processor(ctx, chunk_size, count, std::move(req)); return restinio::request_accepted(); } else return restinio::request_rejected(); }); return router;
}

Здесь формируются обработчики HTTP GET запросов для URL трех типов:

  • вида http://localhost/;
  • вида http://localhost/<speed>[<U>]/;
  • вида http://localhost/<speed>[<U>]/<count>/

Так 128 или 128b означает скорость в 128 байт в секунду. Где speed — это число, определяющее скорость, а U — это опциональный мультипликатор, который указывает, в каких единицах задана скорость. А 128k — 128 килобайт в секунду.

На каждый URL вешается своя лямбда функция, которая разбирается с полученными параметрами, если все нормально, вызывает уже показанную выше функцию request_processor.

Вспомогательная функция extract_chunk_size выглядит следующим образом:

std::size_t extract_chunk_size(const restinio::router::route_params_t & params) { const auto multiplier = [](const auto sv) noexcept -> std::size_t { if(sv.empty() || "B" == sv || "b" == sv) return 1u; else if("K" == sv || "k" == sv) return 1024u; else return 1024u*1024u; }; return restinio::cast_to<std::size_t>(params["value"]) * multiplier(params["multiplier"]);
}

Здесь C++ная лямбда используется для эмуляции локальных функций из других языков программирования.

Функция main

Осталось посмотреть, как все это запускается в функции main:

using router_t = restinio::router::express_router_t<>;
...
int main() { struct traits_t : public restinio::default_single_thread_traits_t { using logger_t = restinio::single_threaded_ostream_logger_t; using request_handler_t = router_t; }; asio_ns::io_context io_ctx; restinio::run( io_ctx, restinio::on_this_thread<traits_t>() .port(8080) .address("localhost") .write_http_response_timelimit(60s) .request_handler(make_router(io_ctx))); return 0;
}

Что здесь происходит:

  1. Поскольку мне нужен не обычный штатный роутер запросов (который вообще ничего делать сам не может и перекладывает всю работу на плечи программиста), то я определяю новые свойства для своего HTTP-сервера. Для этого беру штатные свойства однопоточного HTTP-сервера (тип restinio::default_single_thread_traits_t) и указываю, что в качестве обработчика запросов будет использоваться экземпляр express-like роутера. Заодно, чтобы контролировать, что происходит внутри, указываю, чтобы HTTP-сервер использовал настоящий логгер (по умолчанию используется null_logger_t который вообще ничего не логирует).
  2. Поскольку мне нужно взводить таймеры внутри after-write нотификаторов, то мне нужен экземпляр io_context, с которым я смог бы работать. Поэтому я его создаю сам. Это дает мне возможность передать ссылку на мой io_context в функцию make_router.
  3. Остается только запустить HTTP-сервер в однопоточном варианте на ранее созданном мной io_context-е. Функция restinio::run вернет управление только когда HTTP-сервер завершит свою работу.

Полный код, которого чуть-чуть больше из-за дополнительных typedef-ов и вспомогательных функций, несколько подлиннее. В статье не был показан полный код моего тестового сервера, только его основные моменты. На момент написания статьи это 185 строк, включая пустые строки и комментарии. Увидеть его можно здесь. Ну и написаны эти 185 строк за пару-тройку подходов суммарной длительностью вряд ли более часа.

В практическом плане быстро был получен нужный мне вспомогательный инструмент. Мне такой результат понравился и задача оказалась интересной. И в плане дальнейшего развития RESTinio появились кое-какие мысли.

Сам проект живет на BitBucket, есть зеркало на GitHub. В общем, если кто-то еще не пробовал RESTinio, то я приглашаю попробовать. Задать вопрос или высказать свои предложения можно в Google-группе или прямо здесь, в комментариях.

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть