Хабрахабр

Добавляем распределенность в SObjectizer-5 с помощью MQTT и libmosquitto

Когда-то в SObjectizer-4 «из коробки» была доступна возможность построения распределенных приложений. Но не всегда это работало так хорошо, как хотелось бы. В итоге в SObjectizer-5 от поддержки распределенности в самом ядре SObjectizer-а мы отказались (подробнее этот вопрос рассматривается здесь). Отказались в пользу того, чтобы под конкретную задачу можно было выбрать конкретный транспорт с учетом особенностей этой самой задачи. Написав для этого соответствующую обвязку, которая будет скрывать от программиста детали работы выбранного транспорта.

В данной статье мы попробуем рассказать об одной такой обвязке вокруг MQTT и libmosquttio, посредством которой была реализована возможность взаимодействия частей распределенного приложения.

Пару лет назад мы помогали одной команде с разработкой прототипа распределенной измерительной системы. Прототип заработал и, насколько нам известно, работает в демонстрационном режиме до сих пор. К сожалению, у команды возникли сложности с поиском дальнейшего финансирования и проект заморозили. Из-за «подвешенного» статуса проекта мы сможем рассказать только то, что сможем рассказать, без имен, явок и паролей подробностей.

Планировалось иметь центральный сервер, куда стекается и где обрабатывается вся собранная информация. Итак, была идея создания распределенной системы сбора некоторой измерительной информации. А также некоторое множество (от нескольких сотен до нескольких тысяч) «измерительных станций».

А уже дальше станции сами в любой момент могут отдавать информацию центральному серверу, при этом центральный сервер в любой момент может выдавать новые команды измерительным станциями. Первоначально центральный сервер раздает команды на сбор информации на измерительные станции.

Как из-за экономии ресурсов этих самых станций, так и из-за необходимости местами работать с системным API. Для реализации начинки измерительных станций решили использовать C++.

Хотя в рамках прототипа на C++ был написан демонстрационный «сервер» на C++, который поддерживал несколько сценариев сбора информации от измерительных станций, но без обработки этой информации. В реализации же центрального сервера можно было использовать использовать Java и/или что-то более удобное и безопасное, нежели C++.

MQTT в качестве транспорта

Итак, есть измерительная станция, на которой крутится написанный на C++ процесс, который должен взаимодействовать с центральным сервером: получать от сервера команды, отдавать серверу результаты выполнения команд, контролировать наличие связи с сервером, переподключаться к серверу при разрывах связи и т.д.

Возникал естественный вопрос: какой именно транспорт будет использоваться для общения центрального сервера с измерительными станциями?

Остановились на протоколе MQTT, который: Нужно сказать, что ответ на этот вопросы искали не очень долго.

  • имеет официальную спецификацию и существуют реализации этой спецификации для разных языков программирования;
  • очень простой и легковесный. Т.е. даже если придется для каких-то совсем уж специфических условий написать свою реализацию этого протокола, то это не составит большого труда;
  • позволяет легко обмениваться информацией в обоих направлениях;
  • предполагает использование простой, но мощной и удобной модели Publish/Subscribe.

В общем, по нашему мнению, раз уж MQTT для такого рода задач создавался, то грех его в подобных условиях не использовать.

Сами данные было решено обрамлять в JSON. Но MQTT — это транспорт. Опять же, для любого мало-мальски востребованного языка программирования есть библиотеки, позволяющие работать с JSON-ом. Благо, JSON достаточно простой, понятный и легковесный.

Вот о том, как мы это сделали и поговорим дальше. Таким образом мы столкнулись с тем, что в своем C++ коде мы должны подружить SObjectizer с MQTT и JSON-ом.

Мы сделали небольшую надстройку над SObjectizr-ом и libmosquitto под названием mosquitto_transport. Эта надстройка берет на себя задачи взаимодействия с MQTT-брокером и предоставляет разработчику небольшой API для публикации и подписки сообщения.

В общем, в ней реализован тот минимум, который потребовался для прототипа. Библиотека mosquitto_transport написана на C++14, работает под Linux, поддерживает лишь часть возможностей MQTT (в частности, используется только QoS=0).

Как выглядит использование mosquitto_transport?

Для использования mosquitto_transport разработчику нужно сделать несколько обязательных шагов, которые мы подробнее рассмотрим ниже.

Но такое длинное имя использовать в примерах неудобно. Библиотека mosquitto_transport размещает все свое содержимое в пространстве имен mosquitto_transport. Поэтому далее местами будет использоваться синоним mosqtt, который вводится следующим образом: namespace mosqtt = mosquitto_transport;

Шаг первый. Инициализация mosquitto_transport и libmosquitto

В качестве базы у mosquitto_transport лежит libmosquitto, а libmosquitto нуждается в явной инициализации в начале работы программы и явной деинициализации при завершении программы. Поэтому для работы с mosquitto_transport нужно создать где-то в программе объект типа lib_initializer_t. Этот объект в своем конструкторе проинициализирует libmosquitto, а в деструкторе — выполнит деинициализацию. Ссылку на lib_initializer_t нужно будет затем передать в конструктор агента a_transport_manager_t.

Обычно в программе это выглядит следующим образом:

int main() { mosqtt::lib_initializer_t mosqtt_lib; ...
}

Шаг второй. Запуск агента a_transport_manager_t и получение экземпляра instance_t

Весь поток входящих и исходящих сообщений обслуживает специальный агент a_transport_manager_t. Разработчик должен сам создать агента этого типа у себя в программе и зарегистрировать этого агента в составе какой-нибудь кооперации.

Именно этот instance_t будет нужен для публикации и получения сообщений. Кроме собственно создания и регистрации агента a_transport_manager_t, нужно сделать еще одну важную вещь: взять у агента экземпляр объекта специального типа instance_t.

Если в приложении работают всего несколько агентов, то их всех можно поместить в одну кооперацию с агентом a_transport_manager_t:

int main() ); });
}

Но в более сложных случаях, когда прикладных агентов больше и/или прикладные агенты могут возникать и исчезать по ходу работы приложения, имеет смысл создать отдельную кооперацию с a_transport_manager_t и привязать этого агента к собственному диспетчеру. В этом случае может быть разумным выделить создание a_transport_manager_t в отдельную вспомогательную функцию:

mosqtt::instance_t make_transport( so_5::environment_t & env, mosqtt::lib_initializer_t & mosqtt_lib, ... /* Какие-то дополнительные параметры */) { mosqtt::instance_t instance; env.introduce_coop( // Транспортный агент будет работать на собственной рабочей нити. so_5::disp::one_thread::create_private_disp(env)->binder(), [&](so_5::coop_t & coop) { auto transport = coop.make_agent<mosqtt::a_transport_manager_t>( std::ref{mosqtt_lib}, ... /* Дополнительные параметры */); instance = transport->instance(); }); return instance;
}

Что позволяет затем использовать make_transport, например, следующим образом:

int main() { ... mosqtt::lib_initializer_t mosqtt_lib; ... so_5::launch([&](so_5::environment_t & env) { // Запускаем MQTT-транспорт. auto mqtt = make_transport(env, mosqtt_lib, ...); // Регистрируем первую кооперацию с прикладными агентами приложения. env.introduce_coop([&](so_5::coop_t & coop) { coop.make_agent<first_app_agent>(..., mqtt, ...); coop.make_agent<second_app_agent>(..., mqtt, ...); coop.make_agent<third_app_agent>(..., mqtt, ...); }); // Регистрируем вторую кооперацию с прикладными агентами приложения. env.introduce_coop(...); ... // И так далее. });
}

Примечательной особенностью агента a_transport_manager_t является то, что ряд обработчиков событий у него помечены как thread-safe. Что делает возможным привязку этого агента к диспетчеру типа adv_thread_pool. В этом случае агент сможет часть своих событий обрабатывать параллельно сразу на нескольких рабочих нитях. Хотя нам на практике эта функциональность не потребовалась.

Шаг третий. Реализация сериализации/десериализации прикладных сообщений

Самая важная часть при использовании mosquitto_transport — это реализация формата представления передаваемых сообщений. Например, если у нас есть сообщение вида:

struct sensor_data_t { data_category_t category_; timestamp_t when_; int value_; ...
};

то для передачи его MQTT-брокеру и далее подписчикам следует преобразовать сообщение в какой-то формат. Например, в XML, JSON или ProtoBuf. Для того, чтобы это происходило автоматически во время работы с mosquitto_transport разработчик должен реализовать сериализацию/десериализацию. Подробнее этот вопрос рассматривается ниже, пока же покажем основную идею.

Например, это может быть текстовое JSON-представление. Во-первых, разработчик должен определиться с тем, как данные будут представлены. Может быть и то, и другое — mosquitto_transport этому не препятствует, но разработчик сам должен следить за тем, какой формат сообщений в каких топиках у него используется. Или же бинарное ProtoBuf представление.

Например, если мы хотим использовать только JSON, то мы определяем тип-тег следующего вида: Во-вторых, для каждого своего формата разработчик должен определить тип-тег.

struct json_encoding {};

Этот тип-тег потребуется для параметризации шаблонов и выбора перегруженных функций.

В-третьих, разработчик должен для каждого своего форма определить в пространстве имен mosquitto_transport частичную специализацию для шаблонных классов encoder_t и decoder_t.
Чтобы получилось что-то вроде:

// Это наш тип-тег, который определяет формат представления данных.
struct my_encoding_tag {}; // Реализация сериализации/десериализации данных через специализацию
// типов внутри пространства имен mosquitto_transport.
namespace mosquitto_transport { // Реализация сериализации.
template<typename Msg>
struct encoder_t<my_encoding_tag, Msg> { static std::string encode(const Msg & what) { ... // Тут какой-то код по сериализации. }
}; // Реализация десериализации.
template<typename Msg>
struct decoder_t<my_encoding_tag, Msg> { static Msg decode(const std::string & payload) { ... // Тут какой-то код по десериализации. }
}; } /* namespace mosquitto_transport */

Далее операции публикации сообщений и подписки на сообщения нужно будет указывать тип-тег в качестве маркера, благодаря которому mosquitto_transport будет понимать, как именно нужно сериализовать и десериализовать сообщения.

Последующие шаги. Публикация и получение сообщений

После того как три вышеозначенных обязательных шага выполнены, можно выполнять операции публикации сообщений и подписки на сообщения.

Публикация выглядит следующим образом:

mosqtt::topic_publisher_t<Tag_Type>::publish( // Объект instance, полученный от a_transport_manager_t. instance, // Имя топика на MQTT-брокере, в который будет выполняться // публикация сообщения. "some/topic/name", // Объект, чье содержимое будет опубликовано. some_message_instance);

Такой операций мы говорим, что хотим опубликовать сообщение в конкретный топик и что это сообщение должно быть сериализовано соответствующим образом (формат определяет шаблонный параметр Tag_Type). Например, мы могли бы написать так:

mosqtt::topic_publisher_t<my_encoding_tag>::publish( instance, "/sensor/1023/updates", sensor_data_t{ data_category_t::recent_value, current_timestamp, value});

В этом случае mosquitto_transport сериализовал бы данные из сообщения типа sensor_data_t в формат, определенный для my_encoding_tag, и опубликовал бы получившееся сообщение в топик "/sensor/1023/updates".

Например, если мы хотим получать все сообщения из топиков "/sensor/+/updates", то мы можем сделать вот такой вызов subscribe() внутри какого-то из своих агентов: Для того, чтобы получать сообщения, которые кто-то публикует в интересующие нас топики нужно сделать как бы «двойную подписку»: нужно вызвать специальный метод subscribe() и внутри этого метода подписаться на сообщения из специального mbox-а.

class my_agent_t : public so_5::agent_t { ... void so_define_agent() override { mosqtt::topic_subscriber_t<my_encoding_tag>::subscribe( // Объект instance, полученный от a_transport_manager_t. instance_, // Топик, на который хотим подписаться. "sensor/+/updates", // Лямбда, внутри которой будет происходить подписка на // входящие сообщения. [this](so_5::mbox_t mbox) { // Сама подписка для того, чтобы агент мог получать // входящие сообщения. so_subscribe(mbox).event(&my_agent_t::on_sensor_data); }); } ... void on_sensor_data( mhood_t<mosqtt::incoming_message_t<my_encodig_tag>> cmd) { ... }
};

Двойная подписка здесь нужна потому, что у нас, по сути, есть два разных действия:

  • во-первых, нам нужно заставить transport_manager-а подписаться на конкретный топик на MQTT-брокере. Без этой подписки брокер просто не будет доставлять нам новые входящие сообщения. Собственно, именно это и делает метод subscribe();
  • во-вторых, когда MQTT-брокер доставляет нам сообщение из подписанного нами топика, то нужно доставить это сообщение до агента-получателя. Для этого должен использоваться механизм подписок SObjectizer-а. Как раз поэтому внутри subscribe() создается новый mbox, который будет связан с конкретной подпиской на MQTT-брокере. Когда брокер отдаст нам новое сообщение, то это сообщение будет отослано в mbox. Соответственно, сообщение дойдет до агента, который подписался на этот mbox.

Простейший Hello, World

Для демонстрации mosquitto_transport «в деле» сделаем типичный хелло-ворлдовский пример. В пример входят два приложения:

  • demo_server_app. Подключается к локальному MQTT-шному брокеру и подписывается на сообщения из топиков "/client/+/hello". Когда сообщение приходит, отвечает ответным сообщением в топик "/client/:ID/replies";
  • demo_client_app. Подключается к локальному MQTT-шному брокеру, подписывается на топик "/client/:ID/replies" и публикует сообщение в топик "/client/:ID/hello". Завершает свою работу как только в "/client/:ID/replies" приходит ответ.

В качестве ":ID" в именах топиков будет использоваться целочисленный идентификатор, который demo_client_app генерирует случайным образом.

Сообщения передаются в JSON-формате.

Мы же пройдемся по основным моментам реализации примера. Исходные тексты примера можно найти в этом репозитории.

Определение сообщений для взаимодействия «клиента» и «сервера»

В заголовочном файле demo/messages.hpp сделаны определения структур данных, которые выступают в качестве сообщений для взаимодействия «клиента» и «сервера». Клиент отсылает серверу сообщение типа client_hello_t, а сервер в ответ присылает сообщение типа server_hello_t. Конечно же, в виде MQTT-сообщений летают сериализованные в JSON представления этих сообщений. Но в программе работа идет именно с типами client_hello_t/server_hello_t.

И делаем частичную специализацию encoder_t и decoder_t в соответствии с тем, как мы об этом говорили выше. Для того, чтобы сообщения типов client_hello_t/server_hello_t сериализовались и десериализовались должным образом, мы определяем тип-тег с именем json_format_t. Поэтому внутри наших типов client_hello_t/server_hello_t есть шаблонные методы json_io — это как раз специфика json_dto. Для работы с JSON-ом мы используем небольшую надстройку над RapidJSON под названием json_dto.

Итак, вот что находится в messages.hpp:

#pragma once #include <json_dto/pub.hpp> #include <mosquitto_transport/pub.hpp> namespace demo { // Тип-тег, который будет указывать в каком формате должны быть
// сериализованы сообщения.
struct json_format_t {}; // Удобные имена для использования.
using publisher_t = mosquitto_transport::topic_publisher_t<json_format_t>;
using subscriber_t = mosquitto_transport::topic_subscriber_t<json_format_t>; // Сообщение о появлении нового клиента.
struct client_hello_t { int id_; std::string greeting_; std::string reply_topic_; template<typename Json_Io> void json_io(Json_Io & io) { using namespace json_dto; io & mandatory("id", id_) & mandatory("greeting", greeting_) & mandatory("reply_topic", reply_topic_); }
}; // Ответное сообщение от сервера.
struct server_hello_t { std::string greeting_; template<typename Json_Io> void json_io(Json_Io & io) { io & json_dto::mandatory("greeting", greeting_); }
}; } /* namespace demo */ // Средства для сериализации-десериализации сообщений.
namespace mosquitto_transport { template<typename Msg>
struct encoder_t<demo::json_format_t, Msg> { static std::string encode(const Msg & msg) { return json_dto::to_json(msg); }
}; template<typename Msg>
struct decoder_t<demo::json_format_t, Msg> { static Msg decode(const std::string & json) { return json_dto::from_json<Msg>(json); }
}; } /* namespace mosquitto_transport */

Отдельного пояснения, возможно, заслуживают определения имен publisher_t и subscriber_t. Они нужны для того, чтобы в прикладном коде можно было писать, например:

publisher_t::publish(...)

вместо:

mosqtt::topic_publisher_t<json_format_t>::publish(...)

Общий утилитарный код

Еще один заголовочный файл, demo/common.hpp, содержит вспомогательные функции, который нужны как клиенту, так и серверу.

Нам нужно два таких объекта. Функция make_loggers(), которую мы рассматривать не будем, создает объекты для логгирования. Второй — демонстрационным приложением. Первый будет использоваться mosquitto_transport-ом. Т.е. В примере для логгеров уровень детализации выставляется в spdlog::level::trace. логироваться будет все, что позволяет наблюдать практически за всем, что происходит внутри приложения.

Возвращает run_transport_manager() экземпляр типа mosquitto_transport::instance_t, который затем потребуется для публикации и подписки: А вот функция run_transport_manager(), код которой мы посмотрим, служит как раз для создания и запуска агента a_transport_manager_t.

auto run_transport_manager( so_5::environment_t & env, const std::string & client_id, std::shared_ptr<spdlog::logger> logger) { mosquitto_transport::instance_t mqtt; env.introduce_coop([&](so_5::coop_t & coop) { auto lib = coop.take_under_control( std::make_unique<mosquitto_transport::lib_initializer_t>()); auto tm = coop.make_agent<mosquitto_transport::a_transport_manager_t>( std::ref(*lib), mosquitto_transport::connection_params_t{ client_id, "localhost", 1883u, 60u}, std::move(logger)); mqtt = tm->instance(); }); return mqtt;
}

Здесь создается новая кооперация, которая содержит единственного агента — a_transport_manager_t. Этому агенту в конструктор передаются параметры подключения к MQTT-брокеру (поскольку это хелло-ворловский пример, то достаточно MQTT-брокера на localhost-е) и логгер, который следует использовать. Но самое любопытное здесь — это создание объекта lib_initializer_t. Он создается динамически, а ответственность за его удаление перекладывается на кооперацию с агентом a_transport_manager_t. Тем самым у нас получается, что libmosquitto будет проинициализирована непосредственно перед созданием агента a_transport_manager_t, а деинициализированна она будет уже после того, как агент a_transport_manager_t перестанет существовать.

Основная часть demo_server_app

В demo_server_app всю прикладную логику выполняет агент listener_t, реализованный следующим образом:

class listener_t final : public so_5::agent_t {
public: listener_t( context_t ctx, mosqtt::instance_t mqtt, std::shared_ptr<spdlog::logger> logger) : so_5::agent_t{std::move(ctx)} , mqtt_{std::move(mqtt)} , logger_{std::move(logger)} {} virtual void so_define_agent() override { demo::subscriber_t::subscribe(mqtt_, "/client/+/hello", [&](const so_5::mbox_t & mbox) { so_subscribe(mbox).event(&listener_t::on_hello); }); } private: mosqtt::instance_t mqtt_; std::shared_ptr<spdlog::logger> logger_; void on_hello(mhood_t<demo::subscriber_t::msg_type> cmd) { logger_->trace("message received from topic: {}, payload={}", cmd->topic_name(), cmd->payload()); const auto msg = cmd->decode<demo::client_hello_t>(); logger_->info("hello received. client={}, greeting={}", msg.id_, msg.greeting_); demo::publisher_t::publish( mqtt_, msg.reply_topic_, demo::server_hello_t{"World, hello!"}); }
};

Это, можно сказать, типичный маленький SObjectizer-овский агент. В своем перегруженном методе so_define_agent() выполняется подписка на сообщения из топиков, удовлетворяющих маске "/client/+/hello". Когда такие сообщения от брокера будут приходить, то для их обработки будет вызываться метод on_hello.

Т.е. В метод on_hello() передается специальный объект, который содержит имя топика (в который сообщение было опубликовано) и само сообщение именно в том виде, в котором оно пришло от MQTT-брокера. Но мы хотим получить это сообщение в виде объекта типа client_hello_t. в JSON-представлении. Для этого вызывается метод decode(), в котором и происходит десериализация тела сообщения из JSON-а в client_hello_t.

Имя топика для публикации ответа берется из пришедшего client_hello_t. Ну а в конце on_hello(), публикуется ответное сообщение.

Основная часть demo_client_app

В demo_client_app так же основную работу выполняет единственный агент типа client_t. Но этот агент несколько посложнее, чем listener_t в demo_server_app. Вот код client_t:

class client_t final : public so_5::agent_t {
public: client_t( context_t ctx, int id, mosqtt::instance_t mqtt, std::shared_ptr<spdlog::logger> logger) : so_5::agent_t{std::move(ctx)} , id_{id} , mqtt_{std::move(mqtt)} , logger_{std::move(logger)} { } virtual void so_define_agent() override { // Подписываемся на уведомления о наличии подключения к брокеру. so_subscribe(mqtt_.mbox()).event(&client_t::on_broker_connected); demo::subscriber_t::subscribe(mqtt_, make_topic_name("/replies"), [&](const so_5::mbox_t & mbox) { so_subscribe(mbox).event(&client_t::on_reply); }); } private: const int id_; mosqtt::instance_t mqtt_; std::shared_ptr<spdlog::logger> logger_; std::string make_topic_name(const char * suffix) const { return std::string("/client/") + std::to_string(id_) + suffix; } void on_broker_connected(mhood_t<mosqtt::broker_connected_t>) { // После того, как подключились к брокеру можем сообщить о // своем существовании. demo::publisher_t::publish( mqtt_, make_topic_name("/hello"), demo::client_hello_t{ id_, "Hello, World", make_topic_name("/replies")}); } void on_reply(mhood_t<demo::subscriber_t::msg_type> cmd) { logger_->trace("message received from topic: {}, payload={}", cmd->topic_name(), cmd->payload()); const auto msg = cmd->decode<demo::server_hello_t>(); logger_->info("hello received. greeting={}", msg.greeting_); // Теперь мы свою работу можем завершить. logger_->warn("finishing"); so_environment().stop(); }
};

Здесь агент client_t обрабатывает два события. С событием, на которое подписывается обработчик on_reply(), все должно быть понятно — это обработка сообщения, которое приходит от MQTT-брокера.

Дело в том, что когда клиент стартует, нужно выбрать момент, когда можно публиковать начальное сообщение в топике "/client/:ID/hello". Пояснить следует обработчик on_broker_connected. еще нет подключения к MQTT-брокеру. Поскольку если агент client_t сделает это сразу после своего старта, то сообщение никуда не уйдет и будет потеряно, т.к. Поэтому агенту client_t нужно дождаться, пока это подключение будет установлено.

Когда связь с брокером появляется (т.е. Взаимодействием с MQTT-брокером занимается a_transport_manager_t. На этот сигнал подписывается client_t и когда сигнал приходит, client_t публикует сообщение client_hello_t. когда не только установили TCP-соединение с брокером, но и когда обменялись с брокером сообщениями CONNECT/CONNACT) агент a_transport_manager_t отсылает в специальный mbox сигнал broker_connected_t.

Результаты работы приложения

Запустив demo_server_app можно увидеть приблизительно следующую картину:

Здесь видно, как demo_server_app подключился к брокеру, как сделал подписку, как получил входящее сообщение и как ответил на него.

Запустив demo_client_app можно увидеть картинку вот такого плана:

Здесь видно, как demo_client_app подключился к брокеру, как сделал подписку, как отослал свое приветственное сообщение и как получил сообщение от demo_server_app в ответ.

Выше мы показали как выглядит на данный момент получившаяся у нас библиотека mosquitto_transport. Теперь можно попробовать сказать несколько слов о ряде проектных решений, которые были положены в основу реализации.

Небольшой дисклаймер: mosquitto_transport сама по себе является прототипом

Когда у нас появилась задача предоставить возможность SObjectizer-овским агентам общаться с внешним миром через MQTT, то мы быстро убедились, что использовать такие широко распространенные библиотеки, как libmosquitto и Paho не удобно. С примитивными C++ными обертками над ними так же все было печально.

Тогда как нам нужно, чтобы в приложении было несколько независимых агентов, каждый из которых должен получать только те сообщения, в которых он заинтересован. Главная проблема, в частности, с libmosquitto, заключалась в том, что в libmosquitto предполагается делать обработку вообще всех входящих сообщений в одном и том же коллбэке.

Чтобы этой слой скрывал от агентов всю требуху работы с libmosquitto-вскими вызовами и коллбэками. Соответственно, нужно было сделать какой-то слой, который бы располагался между прикладными агентами и libmosquitto. Но каким должен быть этот слой?

В mosquitto_transport мы попытались найти этот ответ. Готового ответа у нас, естественно, не было. Больше всего работы потребовала поддержка wildcard-ов в именах топиков при подписке. Получилось более-менее нормально, особенно с учетом того, что сил в mosquitto_transport было вложено, откровенно говоря, совсем немного. Ну и написание понятного README, чтобы затем самим не забыть, как пользоваться mosquitto_transport.

Кроме того, в существующей реализации остались серьезные белые пятна. Наверняка можно было бы сделать лучше. Или вопросы производительности, которые в рамках разрабатываемого прототипа были несущественными. Например, поддержка уровней QoS отличных от 0.

Который можно брать и пробовать. Так что сами мы рассматриваем текущую реализацию mosquitto_transport как вполне себе работающий и отлаженный прототип. Но который, наверняка, придется доработать напильником для каких-то других условий.

Всю основную работу делает a_transport_manager_t и одна вспомогательная рабочая нить

В mosquitto_transport практически всю работу выполняет агент a_transport_manager_t. Он создает экземпляр mosquitto, он реализует все нужные коллбэки, он запускает основной цикл событий libmosquitto на контексте отдельной рабочей нити посредством mosquitto_loop_start(), он же затем останавливает этот цикл через mosquitto_loop_stop.

Но в работе a_transport_manager_t есть несколько фокусов, на которых можно заострить внимание.

Делегирование работы с одной рабочей нити на другую

Основной цикл обработки событий libmosquitto выполняется на контексте отдельной рабочей нити. Это означает, что и коллбэки, вроде mosquitto_log_callback, mosquitto_connect_callback, mosquitto_subscribe_callback и т.д., вызываются на контексте этой самой отдельной нити. Но их этих коллбэков нужно получать доступ к данным агента a_transport_manager_t. Который, в свою очередь, работает на другой рабочей нити (или даже на нескольких других нитях в случае adv_thread_pool-диспетчера).

Вместо этого мы из коллбэка отсылаем агенту a_transport_manager_t сообщение. Мы не стали защищать данные агента a_transport_manager_t какими-то примитивами синхронизации (вроде std::mutex-а). Из-за чего при обработке сообщения нам вообще не нужно думать о синхронизации доступа к данным. Которое уже будет обработано на основном рабочем контексте агента.

Реализация mosquitto_message_callback: Вот, например, как это выглядит в коде.

void
a_transport_manager_t::on_message_callback( mosquitto *, void * this_object, const mosquitto_message * msg ) { auto tm = reinterpret_cast< a_transport_manager_t * >(this_object); tm->m_logger->trace( "on_message, topic={}, payloadlen={}, qos={}" ", retain={}", msg->topic, msg->payloadlen, msg->qos, msg->retain ); so_5::send< message_received_t >( tm->m_self_mbox, *msg ); }

И обработчик сообщения message_received_t:

void
a_transport_manager_t::on_message_received( const message_received_t & cmd ) { auto subscribers = m_delivery_map.match( cmd.m_topic ); if( !subscribers.empty() ) { for( auto * s : subscribers ) s->deliver_message( cmd.m_topic, cmd.m_payload ); } else m_logger->warn( "message for unregistered topic, topic={}, " "payloadlen={}", cmd.m_topic, cmd.m_payload.size() ); }

Асинхронная работа с подписками

Когда какой-то агент делает вызов subscribe(), то подписка на соответствующий MQTT-шный топик выполняется не синхронно внутри subscribe(), а асинхронно. Вначале агенту a_transport_manager_t отсылается сообщение subscribe_topic_t. Затем, агент a_transport_manager_t обрабатывает это сообщение и пытается создать подписку на MQTT-брокере. Что может быть в данный момент и невозможно, если сейчас связи с брокером нет.

Обычно это не проблема, но помнить об этом факте следует. Соответственно, вызов subscribe() вовсе не означает, что при выходе из него подписка уже создана и агент сразу же начнет получать опубликованные в соответствующем топике сообщения.

Например, агент хочет сперва подписаться на некий топик и лишь затем начать публиковать свои сообщения. Иногда агенту желательно знать, существует ли подписка на топик или же подписки по каким-то причинам нет. Скажем, вот так может выглядеть код по подписке на топик с получением уведомления о том, что подписка была выполнена успешно: В этом случае агент может подписаться на специальные сообщения subscription_available_t, subscription_unavailable_t и subscription_failed_t.

mosqtt::topic_subscriber_t<my_encoding_tag>::subscribe( mqtt, "some/topic/name", [&](const so_5::mbox_t & mbox) { // Из mbox будут прилетать как опубликованные в топике сообщения... so_subscribe(mbox).event( [&](mhood_t<mosqtt::topic_subscriber_t<my_encoding_tag>::msg_type> cmd) { const auto msg = cmd->decode<my_type>(); ... }); // ...но и сообщения subscription_available_t. so_subscribe(mbox).event( [&](mhood_t<mosqtt::subscription_available_t> cmd) { ... }); });

Автоматическая отписка от топиков

Подписки на MQTT-шные топики создаются через вызовы subscribe(). А вот удаляются подписки автоматически. Происходит это за счет небольшой уличной магии, спрятанной в SObjectizer-овских mbox-ах (которые можно создавать под собственные, возможно, весьма специфические задачи).

Этот служебный mbox точно знает, сколько подписок сделано. Дело в том, что метод subscribe() создает специальный служебный mbox и именно этот mbox затем отдается в лямбда-функцию, в которой программист подписывает своего агента. Когда все подписки уничтожаются, например, при дерегистрации агента или при ручном удалении подписок агента, mbox понимает, что подписок больше нет и просить a_transport_manager_t удалить подписку на MQTT-шный топик.

Метод so_define_agent() как классический пример настройки агента с несколькими состояниями

Обычно мы сталкиваемся с двумя противоположными реакциями людей, которые знакомятся с SObjectizer-ом. Первые говорят о том, что у нас все как-то сложно, агентов нужно от чего-то наследовать, нужно знать про какие-то so_define_agent(), so_evt_start() и т.д. А еще у агентов непонятно зачем есть какие-то непонятные состояния.

А наличие методов so_define_agent(), so_evt_start() и so_evt_finishe(), как раз таки упрощает и написание агентов и, особенно, разбирательства с чужими агентами: мол сразу знаешь куда смотреть и где что искать. Вторые, напротив, говорят о том, что состояния у агентов — это классно.

И, думается, содержимое a_transport_manager_t является хорошим тому подтверждением. Естественно, что вторая точка зрения нам гораздо ближе. В частности, метод so_define_agent() сразу дает представление о том, что, как и когда этот агент обрабатывает:

void
a_transport_manager_t::so_define_agent() { st_working .event( m_self_mbox, &a_transport_manager_t::on_subscribe_topic ) .event( m_self_mbox, &a_transport_manager_t::on_unsubscribe_topic ) .event( m_self_mbox, &a_transport_manager_t::on_message_received, so_5::thread_safe ); st_disconnected .on_enter( [this] { // Everyone should be informed that connection lost. so_5::send< broker_disconnected_t >( m_self_mbox ); } ) .event< connected_t >( m_self_mbox, &a_transport_manager_t::on_connected ); st_connected .on_enter( [this] { // Everyone should be informed that connection established. so_5::send< broker_connected_t >( m_self_mbox ); // All registered subscriptions must be restored. restore_subscriptions_on_reconnect(); } ) .on_exit( [this] { // All subscriptions are lost. drop_subscription_statuses(); // No more pending subscriptions. m_pending_subscriptions.clear(); } ) .event< disconnected_t >( m_self_mbox, &a_transport_manager_t::on_disconnected ) .event( m_self_mbox, &a_transport_manager_t::on_subscription_result ) .event( m_self_mbox, &a_transport_manager_t::on_publish_message, so_5::thread_safe ) .event( &a_transport_manager_t::on_pending_subscriptions_timer ); }

Причем агент a_transport_manager_t использует иерархию состояний, пусть и очень простую:

state_t st_working{ this, "working" };
state_t st_disconnected{ initial_substate_of{ st_working }, "disconnected" };
state_t st_connected{ substate_of{ st_working }, "connected" };

При этом нельзя не отметить, что a_transport_manager_t далеко не самый сложный SObjectizer-овский агент, который нам приходилось реализовывать. Чем больше и сложнее агент, тем выигрышнее оказывается наличие специализированных методов so_define_agent(), so_evt_start() и so_evt_finish().

Более подробно про сериализацию/десериализацию сообщений

Выше мы показали, как пользователь должен настроить сериализацию и десериализацию своих сообщений. Но не объяснили почему это делается именно так. Хотя это весьма принципиальный вопрос. Можно даже назвать его «краеугольным камнем». Поэтому нужно посвятить несколько слов этому аспекту.

Поэтому нужно было сделать так, чтобы mosquitto_transport не был привязан к конкретному формату данных. Во-первых, несмотря на то, что mosquitto_transport мы делали для конкретного проекта, мы хотели сделать так, чтобы mosquitto_transport можно было бы переиспользовать и для других задач. Возможно, даже используя несколько разных представлений в рамках одной программы. Чтобы можно было передавать данные хоть в JSON, хоть в XML, хоть в ProtoBuf, хоть в каком-то другом представлении.

Тратить на эти операции ресурсы a_transport_manager_t не хотелось, т.к. Во-вторых, нам нужно было определиться с тем, кто будет тратить ресурсы на сериализацию/десериализацию сообщений: агент a_transport_manager_t или же прикладные агенты пользователя. Поэтому мы пошли по второму пути: сериализация и десериализация выполняется на контексте пользователя. это прямой путь к тому, чтобы a_transport_manager_t стал узким местом. А вот десериализация выполняется вручную — пользователь сам должен указать, какой тип он хочет получить на выходе при вызове метода incoming_message_t::decode(). Причем сериализация выполняется автоматически внутри topic_publisher_t::publish. Вот как в этом примере:

void on_reply(mhood_t<demo::subscriber_t::msg_type> cmd) { ... const auto msg = cmd->decode<demo::server_hello_t>();

В-третьих, есть еще момент ошибок, которые могут возникать при сериализации/десериализации сообщений. Например, пришло сообщение в формате XML, а мы попытались распарсить его как JSON. Получим ошибку и… И возникает вопрос: как об этой ошибке информировать и как эту ошибку обрабатывать?

Если при вызове publish() или decode() возникнет связанное с ошибкой сериализации/десериализации исключение, то пользователь сможет это исключение обработать любым удобным для него способом. Это еще одна причина, почему сериализация/десериализация вынесена на контекст пользователя.

Такая схема позволяет иметь сразу несколько форматов для сообщений в одном приложении. Вот по совокупности всех этих факторов и была выбрана описанная ранее схема: для каждого формата данных пользователь должен определить специальный тип-тег, для которого нужно сделать частичную специализацию типов encoder_t и decoder_t. Плюс все разрешается статически во время компиляции, поэтому если для какого-то типа или формата сериализация/десериализация не реализована, то это будет обнаружено в compile-time.

Забавно, но каких-то особых впечатлений от использования mosquitto_transport-а не осталось. Оно просто работает.

Из описанного выше видно, что больше всего кода при работе с mosquitto_transport приходится делать для двух вещей:

  • во-первых, создание a_transport_manager_t в начале работы приложения;
  • во-вторых, написание кода по сериализации/десериализации сообщений;

Но, как оказалось, на практике ни один из этих моментов проблем не вызвал. Код для создания a_transport_manager_t довелось написать всего несколько раз: в паре-тройке тестов, ну и в основном приложении, для которого все это предназначалось. Причем, по сравнению с другим кодом, который нужен для инициализации приложения (разбор аргументов командной строки, чтение конфигурации, создание и настройка логгеров и т.д., и т.п.) несколько строчек для запуска a_transport_manager_t — это пусть и не капля в море, но все равно совсем немного.

Ну и его все равно придется писать в том или ином виде, по крайней мере до тех пор, пока в C++ не завезут рефлексию. Благодаря json_dto, кода для сериализации/десериализации сообщение приходится писать довольно мало.

Так что какой-то особой трудоемкости в использовании mosquitto_transport при реализации десятка агентов, общающихся через MQTT с внешним миром и использующих в своей работы порядка трех десятков сообщений, мы не заметили.

Можно только пожалеть, что на глаза не попалось ничего более достойного, чем libmosquitto и Paho. Вот что оставило гораздо более яркие впечатление, так это работа с libmosquitto и изучение поведения MQTT-шного брокера в каких-то ситуациях через изучения его исходного кода. Но это уже совсем другая история…

Вот, пожалуй, и все, что мы хотели рассказать о своем эксперименте по реализации распределенности для SObjectizer-приложений на базе MQTT и libmosquitto. Если какие-то интересующие вас моменты остались «за кадром», то задавайте вопросы — постараемся раскрыть интересующие вас моменты в комментариях.

В заключение, уже традиционно, предлагаем попробовать SObjectizer (github) в деле и поделиться своими впечатлениями: практически все конструктивные отзывы используются нами для совершенствования SObjectizer-а.

Наличие распределенности в ядре SObjectizer — местами хорошо, но местами сильно не очень (как показывает опыт SObjectizer-4). Но сегодня хочется затронуть еще такой аспект. Такое ощущение, что получается как в известной мудрости: чтобы ты не выбрал, все равно пожалеешь. Отсутствие распределенности в ядре — опять местами хорошо, но местами не очень. Например, один из распространенных вопросов в адрес SObjectizer-5: «А что у вас с распределенностью?»

Уже начинает надоедать объяснять одно и тоже по сотому разу, посему закрадывается мысль: «А не проще ли сделать для SO-5 механизм для поддержки распределенности?» В связи с чем просим поделиться в комментариях своими соображениями на эту тему.

Если да, то для каких задач? Нужна ли вам распределенность в SO-5? Или же вы передаете большие BLOB-ы? Нужно ли вам передавать множество мелких сообщений? Или же вас устраивает протокол/формат, который поддерживается только одним фреймворком? Нужна ли вам интероперабильность с другими языками? Какие-то средства интроспекции, мониторинга и сбора статистики по трафику? Нужны ли вам подтверждения о доставке отдельных сообщений, автоматические перепосылки и т.д.?

А может быть и не проще. В общем, чем больше хотелок будет озвучено, тем проще нам будет определиться с тем, делать ли поддержку прозрачной распределенности и, если делать, то в каком виде. Но, в любом случае, будет какая-то информация для размышлений.

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть