Хабрахабр

«Современные» обедающие философы на C++ посредством акторов и CSP

Статья интересная, она показывает несколько решений этой известной задачи, реализованных на современном C++ с использованием task-based подхода. Некоторое время назад ссылка на статью "Modern dining philosophers" распространилась по ресурсам вроде Reddit и HackerNews. Если кто-то это статью еще не читал, то имеет смысл потратить время и прочесть ее.

Вероятно это как раз из-за использования тасков. Однако, не могу сказать, что представленные в статье решения мне показались простыми и понятными. Так что не всегда понятно, где, когда и какие задачи выполняются. Слишком уж их много создается и диспетчируется посредством разнообразных диспетчеров/сериализаторов.

Почему бы не посмотреть, как задача "обедающих философов" решается посредством моделей Акторов и CSP? При этом task-based подход не является единственным возможным для решения подобных задач.

Код этих решений можно найти в репозитории на BitBucket-е. Посему попробовал посмотреть и реализовал несколько решений этой задачи как с использованием Акторов, так и с использованием CSP. А под катом пояснения и объяснения, так что кому интересно, милости прошу под кат.

Он только говорит "хочу есть", а дальше либо ему кто-то магическим образом предоставляет вилки, либо говорит "сейчас не получится". У меня не было цели в точности повторить решения, показанные в той самой статье "Modern dining philosophers", тем более, что мне в них принципиально не нравится одна важная штука: по сути, в тех решениях "философ" ничего сам не делает.

Однако, мне лично кажется, что более интересно когда именно "философ" пытается взять сперва одну вилку, затем другую. Понятно, почему автор прибег к такому поведению: оно позволяет использовать одну и ту же реализацию "философа" совместно с разными реализациями "протоколов". И когда "философ" вынужден обрабатывать неудачные попытки захвата вилки.

При этом в некоторых решениях использовались те же самые подходы, что и в упомянутой статье (например, реализуемые протоколами ForkLevelPhilosopherProtocol и WaiterFair). Именно такие реализации задачи "обедающих философов" я и попробовал сделать.

Если же кто-то про SObjectizer еще не слышал, то в двух словах: это один из немногих живых и развивающихся OpenSource "акторных фреймворков" для С++ (из прочих можно упомянуть так же CAF и QP/C++). Свои решения я строил на базе SObjectizer-а, что вряд ли удивит тех, кто читал мои статьи раньше. Если нет, то я с удовольствием отвечу на вопросы в комментариях. Надеюсь, что приведенные примеры с моими комментариями будут достаточно понятными даже для незнакомых с SObjectizer-ом.

Сперва рассмотрим реализацию решения Эдсгера Дейкстры, затем перейдем к нескольким другим решениям и посмотрим, как отличается поведение каждого из решений. Обсуждение реализованных решений начнем с тех, которые сделаны на базе Акторов.

Решение Дейкстры

А именно: философы должны захватывать вилки только в порядке увеличения номеров вилок и если философ сумел взять первую вилку, то он уже не отпускает ее пока не получит и вторую вилку. Эдсгер Дейкстра, мало того, что сформулировал саму задачу "обедающих филофосов" (формулировку оной с использованием "вилок" и "спагетти" озвучил Тони Хоар), так еще и предложил очень простое и красивое решение.

Только после этого он может взять вилку номер 6. Например, если философу нужно пользоваться вилками с номерами 5 и 6, то философ должен сперва взять вилку номер 5. Т.о., если вилки с меньшими номерами лежат слева от философов, то философ должен сперва взять левую вилку и лишь затем он может взять правую вилку.

Последний философ в списке, которому приходится иметь дело с вилками за номерами (N-1) и 0, поступает наоборот: он сперва берет правую вилку с номером 0, а затем левую вилку с номером (N-1).

Если философ захотел поесть, то он отсылает соответствующему актору-вилке сообщение на захват вилки, а актор-вилка отвечает ответным сообщением. Для реализации этого подхода потребуется два типа акторов: один для вилок, второй для философов.

Код реализации этого подхода можно увидеть здесь.

Сообщения

Прежде чем говорить об акторах, нужно посмотреть на сообщения, которыми акторы будут обмениваться:

struct take_t
{ const so_5::mbox_t m_who; std::size_t m_philosopher_index;
}; struct taken_t : public so_5::signal_t ; struct put_t : public so_5::signal_t {};

Когда актор-философ заканчивает есть и хочет положить вилки обратно на стол, он отсылает акторам-вилкам сообщения put_t. Когда актор-философ хочет взять вилку, он отсылает актору-вилке сообщение take_t, а актор-вилка отвечает сообщением taken_t.

В этот почтовый ящик должно быть отправлено ответное сообщение taken_t. В сообщении take_t поле m_who обозначает почтовый ящик (он же mbox) актора-философа. Второе поле из take_t в данном примере не используется, оно нам потребуется, когда мы дойдем до реализаций waiter_with_queue и waiter_with_timestamps.

Актор-вилка

Вот его код: Теперь мы можем посмотреть на то, что из себя представляет актор-вилка.

class fork_t final : public so_5::agent_t
{
public : fork_t( context_t ctx ) : so_5::agent_t{ std::move(ctx) } {} void so_define_agent() override { // Начальным должно быть состояние 'free'. this >>= st_free; // В состоянии 'free' обрабатывается только одно сообщение. st_free .event( [this]( mhood_t<take_t> cmd ) { this >>= st_taken; so_5::send< taken_t >( cmd->m_who ); } ); // В состоянии 'taken' обрабатываются два сообщения. st_taken .event( [this]( mhood_t<take_t> cmd ) { // Философу придется подождать в очереди. m_queue.push( cmd->m_who ); } ) .event( [this]( mhood_t<put_t> ) { if( m_queue.empty() ) // Вилка сейчас никому не нужна. this >>= st_free; else { // Вилка должна достаться первому из очереди. const auto who = m_queue.front(); m_queue.pop(); so_5::send< taken_t >( who ); } } ); } private : // Определение состояний для актора. const state_t st_free{ this, "free" }; const state_t st_taken{ this, "taken" }; // Очередь ждущих философов. std::queue< so_5::mbox_t > m_queue;
};

Что мы и видим здесь для типа fork_t. Каждый актор в SObjectizer-е должен быть производен от базового класса agent_t.

Это специальный метод, он автоматически вызывается SObjectizer-ом при регистрации нового агента. В классе fork_t переопределяется метод so_define_agent(). В методе so_define_agent() производится "настройка" агента для работы в SObjectizer-е: меняется стартовое состояние, производится подписка на нужные сообщения.

Вот у актора fork_t есть два состояния: free и taken. Каждый актор в SObjectizer-е представляет из себя конечный автомат с состояниями (даже если актор использует всего одно дефолтное состояние). И после захвата "вилки" актор fork_t должен перейти в состояние taken. Когда актор в состоянии free, вилка может быть "захвачена" философом. Внутри класса fork_t состояния представляются экземплярами st_free и st_taken специального типа state_t.

Например, в состоянии free агент реагирует только на take_t и реакция эта очень простая: меняется состояние актора и отсылается ответное taken_t: Состояния позволяют обрабатывать входящие сообщения по-разному.

st_free .event( [this]( mhood_t<take_t> cmd ) { this >>= st_taken; so_5::send< taken_t >( cmd->m_who ); } );

Тогда как все остальные сообщения, включая put_t в состоянии free попросту игнорируются.

В состоянии же taken актор обрабатывает два сообщения, и даже сообщение take_t он обрабатывает иначе:

st_taken .event( [this]( mhood_t<take_t> cmd ) { m_queue.push( cmd->m_who ); } ) .event( [this]( mhood_t<put_t> ) { if( m_queue.empty() ) this >>= st_free; else { const auto who = m_queue.front(); m_queue.pop(); so_5::send< taken_t >( who ); } } );

Здесь наиболее интересен обработчик для put_t: если очередь ждущих философов пуста, то мы можем вернуться в free, а вот если не пуста, то первому из них нужно отослать taken_t.

Актор-философ

Мы обсудим лишь наиболее значимые фрагменты. Код актора-философа гораздо более объемен, поэтому я не буду приводить его здесь полностью.

У актора-философа немного больше состояний:

state_t st_thinking{ this, "thinking.normal" };
state_t st_wait_left{ this, "wait_left" };
state_t st_wait_right{ this, "wait_right" };
state_t st_eating{ this, "eating" }; state_t st_done{ this, "done" };

Из eating актор может вернуться в thinking или же может перейти в done, если философ съел все, что должен был. Актор начинает свою работу в состоянии thinking, потом переключается в wait_left, затем в wait_right, затем в eating.

Диаграмму состояний для актора-философа можно представить следующим образом:

image

Логика же поведения актора описана в реализации его метода so_define_agent():

void so_define_agent() override
{ // В состоянии thinking реагируем только на сигнал stop_thinking. st_thinking .event( [=]( mhood_t<stop_thinking_t> ) { // Пытаемся взять левую вилку. this >>= st_wait_left; so_5::send< take_t >( m_left_fork, so_direct_mbox(), m_index ); } ); // Когда ждем левую вилку реагируем только на taken. st_wait_left .event( [=]( mhood_t<taken_t> ) { // У нас есть левая вилка. Пробуем взять правую. this >>= st_wait_right; so_5::send< take_t >( m_right_fork, so_direct_mbox(), m_index ); } ); // Пока ждем правую вилку, реагируем только на taken. st_wait_right .event( [=]( mhood_t<taken_t> ) { // У нас обе вилки, можно поесть. this >>= st_eating; } ); // Пока едим реагируем только на stop_eating. st_eating // 'stop_eating' должен быть отослан как только входим в 'eating'. .on_enter( [=] { so_5::send_delayed< stop_eating_t >( *this, eat_pause() ); } ) .event( [=]( mhood_t<stop_eating_t> ) { // Обе вилки нужно вернуть на стол. so_5::send< put_t >( m_right_fork ); so_5::send< put_t >( m_left_fork ); // На шаг ближе к финалу. ++m_meals_eaten; if( m_meals_count == m_meals_eaten ) this >>= st_done; // Съели все, что могли, пора завершаться. else think(); } ); st_done .on_enter( [=] { // Сообщаем о том, что мы закончили. completion_watcher_t::done( so_environment(), m_index ); } );
}

В коде актора нет this_thread::sleep_for или какого-то другого способа блокирования текущей рабочей нити. Пожалуй, единственный момент, на котором следует остановится особо — это подход к имитации процессов "размышления" и "еды". Например, когда актор входит в состояние eating он отсылает самому себе отложенное сообщение stop_eating_t. Вместо этого используются отложенные сообщения. Это сообщение отдается таймеру SObjectizer-а и таймер доставляет сообщение актору когда приходит время.

Грубо говоря, одна нить читает из какой-то очереди сообщения и дергает обработчик очередного сообщения у соответствующего актора-получателя. Использование отложенных сообщений позволяет запускать всех акторов на контексте одной единственной рабочей нити. Подробнее о рабочих контекстах для акторов речь пойдет ниже.

Результаты

Результаты работы этой реализации могут выглядеть следующим образом (небольшой фрагмент):

Socrates: tttttttttttLRRRRRRRRRRRRRREEEEEEEttttttttLRRRRRRRRRRRRRREEEEEEEEEEEEE Plato: ttttttttttEEEEEEEEEEEEEEEEttttttttttRRRRRREEEEEEEEEEEEEEttttttttttLLL Aristotle: ttttEEEEEtttttttttttLLLLLLRRRREEEEEEEEEEEEttttttttttttLLEEEEEEEEEEEEE Descartes: tttttLLLLRRRRRRRREEEEEEEEEEEEEtttLLLLLLLLLRRRRREEEEEEttttttttttLLLLLL Spinoza: ttttEEEEEEEEEEEEEttttttttttLLLRRRREEEEEEEEEEEEEttttttttttRRRREEEEEEtt Kant: ttttttttttLLLLLLLRREEEEEEEEEEEEEEEttttttttttLLLEEEEEEEEEEEEEEtttttttt
Schopenhauer: ttttttEEEEEEEEEEEEEttttttLLLLLLLLLEEEEEEEEEttttttttLLLLLLLLLLRRRRRRRR Nietzsche: tttttttttLLLLLLLLLLEEEEEEEEEEEEEttttttttLLLEEEEEEEEEttttttttRRRRRRRRE
Wittgenstein: ttttEEEEEEEEEEtttttLLLLLLLLLLLLLEEEEEEEEEttttttttttttRRRREEEEEEEEEEEt Heidegger: tttttttttttLLLEEEEEEEEEEEEEEtttttttLLLLLLREEEEEEEEEEEEEEEtttLLLLLLLLR Sartre: tttEEEEEEEEEttttLLLLLLLLLLLLRRRRREEEEEEEEEtttttttLLLLLLLLRRRRRRRRRRRR

Читать это следует следующим образом:

  • t обозначает, что философ "размышляет";
  • L обозначает, что философ ожидает захвата левой вилки (находится в состоянии wait_left);
  • R обозначает, что философ ожидает захвата правой вилки (находится в состоянии wait_right);
  • E обозначает, что философ "ест".

После чего Сократ будет ждать, пока Платон освободит правую вилку. Мы можем видеть, что Сократ может взять вилку слева только после того, как Сартр отдаст ее. Только после этого Сократ сможет поесть.

Простое решение без арбитра (официанта)

Что не есть хорошо, т.к. Если мы проанализируем результат работы решения Дейкстры, то увидим, что философы проводят много времени в ожидании захвата вилок. Не зря же бытует мнение, что если размышлять на голодный желудок, то можно получить гораздо более интересные и неожиданные результаты 😉 это время можно также потратить на раздумья.

Давайте посмотрим на простейшее решение, в котором философ возвращает первую захваченную вилку, если он не может захватить вторую (в упомянутой выше статье "Modern dining philosophers" это решение реализует ForkLevelPhilosopherProtocol).

Исходный код этой реализации можно увидеть здесь, а код соответствующего актора-философа здесь.

Сообщения

В данном решении используется практически такой же набор сообщений:

struct take_t
{ const so_5::mbox_t m_who; std::size_t m_philosopher_index;
}; struct busy_t : public so_5::signal_t {}; struct taken_t : public so_5::signal_t {}; struct put_t : public so_5::signal_t {};

Этот сигнал актор-вилка отсылает в ответ актору-философу если вилка уже захвачена другим философом. Единственное отличие — это присутствие сигнала busy_t.

Актор-вилка

Актор-вилка в этом решении оказывается даже проще, чем в решении Дейкстры:


class fork_t final : public so_5::agent_t
{
public : fork_t( context_t ctx ) : so_5::agent_t( ctx ) { this >>= st_free; st_free.event( [this]( mhood_t<take_t> cmd ) { this >>= st_taken; so_5::send< taken_t >( cmd->m_who ); } ); st_taken.event( []( mhood_t<take_t> cmd ) { so_5::send< busy_t >( cmd->m_who ); } ) .just_switch_to< put_t >( st_free ); } private : const state_t st_free{ this }; const state_t st_taken{ this };
};

Нам здесь даже не нужно хранить очередь ждущих философов.

Актор-философ

Актор-философ в этой реализации похож на оного из решения Дейкстры, но здесь актору-философу приходится обрабатывать еще и busy_t, поэтому диаграмма состояний выглядит следующим образом:

image

Аналогично, вся логика актора-философа определяется в so_define_agent():

void so_define_agent() override
{ st_thinking .event< stop_thinking_t >( [=] { this >>= st_wait_left; so_5::send< take_t >( m_left_fork, so_direct_mbox(), m_index ); } ); st_wait_left .event< taken_t >( [=] { this >>= st_wait_right; so_5::send< take_t >( m_right_fork, so_direct_mbox(), m_index ); } ) .event< busy_t >( [=] { think( st_hungry_thinking ); } ); st_wait_right .event< taken_t >( [=] { this >>= st_eating; } ) .event< busy_t >( [=] { so_5::send< put_t >( m_left_fork ); think( st_hungry_thinking ); } ); st_eating .on_enter( [=] { so_5::send_delayed< stop_eating_t >( *this, eat_pause() ); } ) .event< stop_eating_t >( [=] { so_5::send< put_t >( m_right_fork ); so_5::send< put_t >( m_left_fork ); ++m_meals_eaten; if( m_meals_count == m_meals_eaten ) this >>= st_done; else think( st_normal_thinking ); } ); st_done .on_enter( [=] { completion_watcher_t::done( so_environment(), m_index ); } );
}

В общем-то, это практически такой же код, как и в решении Дейкстры, разве что добавилась пара обработчиков для busy_t.

Результаты

Результаты работы выглядят уже по-другому:

Socrates: tttttttttL..R.....EEEEEEEEEEEEttttttttttR...L.L...EEEEEEEttEEEEEE Plato: ttttEEEEEEEEEEEttttttL.....L..EEEEEEEEEEEEEEEttttttttttL....L.... Aristotle: ttttttttttttL..L.R..EEEEEEtttttttttttL..L....L....R.....EEEEEEEEE Descartes: ttttttttttEEEEEEEEttttttttttttEEEEEEEEttttEEEEEEEEEEEttttttL..L.. Spinoza: ttttttttttL.....L...EEEEEEtttttttttL.L......L....L..L...R...R...E Kant: tttttttEEEEEEEttttttttL.L.....EEEEEEEEttttttttR...R..R..EEEEEtttt
Schopenhauer: tttR..R..L.....EEEEEEEttttttR.....L...EEEEEEEEEEEEEEEEttttttttttt Nietzsche: tttEEEEEEEEEEtttttttttEEEEEEEEEEEEEEEttttL....L...L..L....EEEEEEE
Wittgenstein: tttttL.L..L.....R.R.....L.....L....L...EEEEEEEEEEEEEEEtttttttttL. Heidegger: ttttR..R......EEEEEEEEEEEEEttttttttttR..L...L...L..L...EEEEtttttt Sartre: tttEEEEEEEtttttttL..L...L....R.EEEEEEEtttttEEEEtttttttR.....R..R.

Здесь мы видим новый символ, который означает, что актор-философ находится в "голодных раздумьях".

Это потому, что данное решение защищено от проблемы дедлоков, но не имеет защиты от голоданий. Даже на этом коротком фрагменте можно увидеть, что есть длительные моменты времени, на протяжении которых философ не может поесть.

Решение с официантом и очередью

Упомянутая выше статья "Modern dining philosophers" содержит решение проблемы голоданий в виде протокола WaiterFair. Показанное выше простейшее решение без арбитра не защищает от голоданий. А у официанта есть очередь заявок от философов. Суть в том, что появляется арбитр (официант), к которому обращаются философы когда хотят поесть. И вилки достаются философу только если обе вилки сейчас свободны, а в очереди еще нет ни одного из соседей того философа, который обратился к официанту.

Давайте посмотрим на то, как это же решение может выглядеть на акторах.

Исходный код этой реализации можно найти здесь.

Трюк

Но я хотел сохранить не только уже существующий набор сообщений (т.е. Проще всего было бы ввести новый набор сообщений, посредством которого философы могли бы общаться с официантом. Я так же хотел, чтобы использовался тот же самый актор-философ, что и в предыдущем решении. take_t, taken_t, busy_t, put_t). Поэтому мне нужно было решить хитрую задачку: как сделать так, чтобы актор-философ общался с единственным актором-официантом, но при этом думал, что он взаимодействует напрямую с акторами-вилками (коих уже нет на самом-то деле).

При этом актор-официант подписывается на сообщения из всех этих mbox-ов (что в SObjectizer-е реализуется запросто, т.к. Эта задачка была решена с помощью нехитрого трюка: актор-официант создает набор mbox-ов, ссылки на которые отдаются акторам-философам как ссылки на mbox-ы акторов-вилок. SObjectizer — это реализация не только/столько Модели Акторов, но еще и Pub/Sub поддерживается "из коробки").

В коде это выглядит приблизительно вот так:

class waiter_t final : public so_5::agent_t
{
public : waiter_t( context_t ctx, std::size_t forks_count ) : so_5::agent_t{ std::move(ctx) } , m_fork_states( forks_count, fork_state_t::free ) { // Нужны mbox-ы для каждой "вилки" m_fork_mboxes.reserve( forks_count ); for( std::size_t i{}; i != forks_count; ++i ) m_fork_mboxes.push_back( so_environment().create_mbox() ); } ... void so_define_agent() override { // Требуется создать подписки для каждой "вилки". for( std::size_t i{}; i != m_fork_mboxes.size(); ++i ) { // Нам нужно знать индекс вилки. Поэтому используются лямбды. // Лямбда захватывает индекс и затем отдает этот индекс в // актуальный обработчик входящего сообщения. so_subscribe( fork_mbox( i ) ) .event( [i, this]( mhood_t<take_t> cmd ) { on_take_fork( std::move(cmd), i ); } ) .event( [i, this]( mhood_t<put_t> cmd ) { on_put_fork( std::move(cmd), i ); } ); } } private : ... // Почтовые ящики для несуществующих "вилок". std::vector< so_5::mbox_t > m_fork_mboxes;

сперва создаем вектор mbox-ов для несуществующих "вилок", затем подписываемся на каждый из них. Т.е. Да так подписываемся, чтобы знать к какой именно вилке относится запрос.

Реальным обработчиком входящего запроса take_t является метод on_take_fork():

void on_take_fork( mhood_t<take_t> cmd, std::size_t fork_index )
{ // Используем тот факт, что у левой вилки индекс совпадает // с индексом самого философа. if( fork_index == cmd->m_philosopher_index ) handle_take_left_fork( std::move(cmd), fork_index ); else handle_take_right_fork( std::move(cmd), fork_index );
}

Кстати говоря, именно здесь нам и потребовалось второе поле из сообщения take_t.

Следовательно, мы можем определить, просит ли философ левую вилку или правую. Итак, в on_take_fork() мы имеем исходный запрос и индекс вилки, к которой запрос относится. И, соответственно, мы можем обрабатывать их по-разному (и нам приходится обрабатывать их по-разному).

И мы можем оказаться в одной из следующих ситуаций: Поскольку философ всегда сперва просит левую вилку, то все необходимые проверки нам нужно проделать именно в этот момент.

  1. Обе вилки свободны и могут быть отданы тому философу, который прислал запрос. В этом случае мы отсылаем taken_t философу, а правую вилку помечаем как зарезервированную, чтобы никто больше не мог ее взять.
  2. Вилки не могут быть отданы философу. Не важно почему. Может какая-то из них сейчас занята. Или в очереди есть кто-то из соседей философа. Как бы то ни было, мы помещаем философа, приславшего запрос, в очередь, после чего отсылаем ему busy_t.

Этот запрос будет сразу же удовлетворен, т.к. Благодаря такой логике работы философ, получивший taken_t для левой вилки, может спокойно послать запрос take_t для правой вилки. вилка уже зарезервирована для данного философа.

Результаты

Если запустить получившееся решение, то можно увидеть что-то вроде:

Socrates: tttttttttttL....EEEEEEEEEEEEEEttttttttttL...L...EEEEEEEEEEEEEtttttL. Plato: tttttttttttL....L..L..L...L...EEEEEEEEEEEEEtttttL.....L....L.....EEE Aristotle: tttttttttL.....EEEEEEEEEttttttttttL.....L.....EEEEEEEEEEEtttL....L.L Descartes: ttEEEEEEEEEEtttttttL.L..EEEEEEEEEEEEtttL..L....L....L.....EEEEEEEEEE Spinoza: tttttttttL.....EEEEEEEEEttttttttttL.....L.....EEEEEEEEEEEtttL....L.L Kant: ttEEEEEEEEEEEEEtttttttL...L.....L.....EEEEEttttL....L...L..L...EEEEE
Schopenhauer: ttttL...L.....L.EEEEEEEEEEEEEEEEEtttttttttttL..L...L..EEEEEEEttttttt Nietzsche: tttttttttttL....L..L..L...L...L.....L....EEEEEEEEEEEEttL.....L...L..
Wittgenstein: tttttttttL....L...L....L....L...EEEEEEEttttL......L.....L.....EEEEEE Heidegger: ttttttL..L...L.....EEEEEEEEEEEEtttttL...L..L.....EEEEEEEEEEEttttttL. Sartre: ttEEEEEEEEEEEEEttttttttL.....L...EEEEEEEEEEEEttttttttttttL.....EEEEE

Это потому, что не может возникнуть неудачи или ожидания на запросе правой вилки. Можно обратить внимание на отсутствие символов R.

Еще одно решение с использованием арбитра (официанта)

В некоторых случаях предыдущее решение waiter_with_queue может показывать результаты, похожие вот на этот:

Socrates: tttttEEEEEEEEEEEEEEtttL.....L.L....L....EEEEEEEEEttttttttttL....L.....EE Plato: tttttL..L..L....L.L....EEEEEEEEEEEEEEEttttttttttttL.....EEEEEEEEEttttttt Aristotle: tttttttttttL..L...L.....L.....L....L.....EEEEEEEEEEEEtttttttttttL....L.. Descartes: ttttttttttEEEEEEEEEEttttttL.....L....L..L.....L.....L..L...L..EEEEEEEEtt Spinoza: tttttttttttL..L...L.....L.....L....L.....L..L..L....EEEEEEEEEEtttttttttt Kant: tttttttttL....L....L...L...L....L..L...EEEEEEEEEEEttttttttttL...L......E
Schopenhauer: ttttttL....L..L...L...L.L....L...EEEEEtttttL....L...L.....EEEEEEEEEttttt Nietzsche: tttttL..L..L....EEEEEEEEEEEEEttttttttttttEEEEEEEEEEEEEEEttttttttttttL...
Wittgenstein: tttEEEEEEEEEEEEtttL....L....L..EEEEEEEEEtttttL..L..L....EEEEEEEEEEEEEEEE Heidegger: tttttttttL...L..EEEEEEEEttttL..L.....L...EEEEEEEEEtttL.L..L...L....L...L Sartre: ttttttttttL..L....L...L.EEEEEEEEEEEtttttL...L..L....EEEEEEEEEEtttttttttt

Например, левая и правая вилки для Канта свободны на протяжении длительного времени, но Кант не может их взять, т.к. Можно увидеть наличие достаточно длинных периодов времени, когда философы не могут поесть даже не смотря на наличие свободных вилок. Которые ждут своих соседей. в очереди ожидания уже стоят его соседи. Которые ждут своих соседей и т.д.

Это ему гарантировано. Поэтому рассмотренная выше реализация waiter_with_queue защищает от голодания в том смысле, что рано или поздно философ поест. И утилизация ресурсов может быть не оптимальной временами. Но периоды голодания могут быть довольно долгими.

Вместо очереди там используется приоритизация запросов от философов с учетом времени их голодания. Дабы решить эту проблему я реализовал еще одно решение, waiter_with_timestamp (его код можно найти здесь). Чем дольше философ голодает, тем приоритетнее его запрос.

по большому счету главное в нем — это тот же самый трюк с набором mbox-ов для несуществующих "вилок", который мы уже обсудили в разговоре про реализацию waiter_with_queue. Рассматривать код этого решения мы не будем, т.к.

Несколько деталей реализации, на которые хотелось бы обратить внимание

эти детали демонстрируют интересные особенности SObjectizer-а. Есть несколько деталей в реализациях на базе Акторов, на которые хотелось бы обратить внимание, т.к.

Рабочий контекст для акторов

Что вовсе не означает, что в SObjectizer-е все акторы работают только на одной единственной нити. В рассмотренных реализациях все основные акторы (fork_t, philosopher_t, waiter_t) работали на контексте одной общей рабочей нити. В SObjectizer-е можно привязывать акторов к разным контекстам, что можно увидеть, например, в коде функции run_simulation() в решении no_waiter_simple.

Код run_simulation из no_waiter_simple

void run_simulation( so_5::environment_t & env, const names_holder_t & names )
{ env.introduce_coop( [&]( so_5::coop_t & coop ) { coop.make_agent_with_binder< trace_maker_t >( so_5::disp::one_thread::create_private_disp( env )->binder(), names, random_pause_generator_t::trace_step() ); coop.make_agent_with_binder< completion_watcher_t >( so_5::disp::one_thread::create_private_disp( env )->binder(), names ); const auto count = names.size(); std::vector< so_5::agent_t * > forks( count, nullptr ); for( std::size_t i{}; i != count; ++i ) forks[ i ] = coop.make_agent< fork_t >(); for( std::size_t i{}; i != count; ++i ) coop.make_agent< philosopher_t >( i, forks[ i ]->so_direct_mbox(), forks[ (i + 1) % count ]->so_direct_mbox(), default_meals_count ); });
}

Они будут работать на отдельных рабочих контекстах. В этой функции создаются дополнительные акторы типов trace_maker_t и completion_watcher_t. Что означает, что данные акторы будут работать как активные объекты: каждый будет владеть собственной рабочей нитью. Для этого создается два экземпляра диспетчера типа one_thread и акторы привязываются к этим экземплярам диспетчеров.

При этом разработчик может создать в своем приложении столько экземпляров диспетчеров, сколько разработчику нужно. SObjectizer предоставляет набор из нескольких разных диспетчеров, которые могут использоваться прямо "из коробки".

Скажем, мы легко можем запустить акторов fork_t на одном пуле рабочих нитей, а акторов philosopher_t на другом пуле. Но самое важное то, что в самом акторе ничего не нужно менять, чтобы заставить его работать на другом диспетчере.

Код run_simulation из no_waiter_simple_tp

void run_simulation( so_5::environment_t & env, const names_holder_t & names )
{ env.introduce_coop( [&]( so_5::coop_t & coop ) { coop.make_agent_with_binder< trace_maker_t >( so_5::disp::one_thread::create_private_disp( env )->binder(), names, random_pause_generator_t::trace_step() ); coop.make_agent_with_binder< completion_watcher_t >( so_5::disp::one_thread::create_private_disp( env )->binder(), names ); const auto count = names.size(); // Параметры для настройки поведения thread_pool-диспетчера. so_5::disp::thread_pool::bind_params_t bind_params; bind_params.fifo( so_5::disp::thread_pool::fifo_t::individual ); std::vector< so_5::agent_t * > forks( count, nullptr ); // Создаем пул нитей для акторов-вилок. auto fork_disp = so_5::disp::thread_pool::create_private_disp( env, 3u // Размер пула. ); for( std::size_t i{}; i != count; ++i ) // Каждая вилка будет привязана к пулу. forks[ i ] = coop.make_agent_with_binder< fork_t >( fork_disp->binder( bind_params ) ); // Создаем пул нитей для акторов-философов. auto philosopher_disp = so_5::disp::thread_pool::create_private_disp( env, 6u // Размер пула. ); for( std::size_t i{}; i != count; ++i ) coop.make_agent_with_binder< philosopher_t >( philosopher_disp->binder( bind_params ), i, forks[ i ]->so_direct_mbox(), forks[ (i + 1) % count ]->so_direct_mbox(), default_meals_count ); });
}

И при этом нам не потребовалось поменять ни строчки в классах fork_t и philosopher_t.

Трассировка смены состояний акторов

Если посмотреть в реализацию философов в упомянутой выше статье Modern dining philosophers можно легко увидеть код, относящийся к трассировке действий философа, например:

void doEat() { eventLog_.startActivity(ActivityType::eat); wait(randBetween(10, 50)); eventLog_.endActivity(ActivityType::eat);

Но трассировка, тем не менее, выполняется. В тоже время в показанных выше реализациях на базе SObjectizer подобного кода нет. За счет чего?

Такой слушатель реализуется как наследник класса agent_state_listener_t. Дело в том, что в SObjectizer-е есть специальная штука: слушатель состояний агента. Когда слушатель связывается с агентом, то SObjectizer автоматически уведомляет слушателя о каждом изменении состояния агента.

Установку слушателя можно увидеть в конструкторе агентов greedy_philosopher_t и philosopher_t:

philosopher_t(...) ... { so_add_destroyable_listener( state_watcher_t::make( so_environment(), index ) ); }

Здесь state_watcher_t — это и есть нужная мне реализация слушателя.

Определение state_watcher_t


class state_watcher_t final : public so_5::agent_state_listener_t
{ const so_5::mbox_t m_mbox; const std::size_t m_index; state_watcher_t( so_5::mbox_t mbox, std::size_t index ); public : static auto make( so_5::environment_t & env, std::size_t index ) { return so_5::agent_state_listener_unique_ptr_t{ new state_watcher_t{ trace_maker_t::make_mbox(env), index } }; } void changed( so_5::agent_t &, const so_5::state_t & state ) override;
};

И уже внутри state_watcher_t::changed инициируются действия для трассировки действий актора-философа. Когда экземпляр state_watcher_t связан с агентом SObjectizer вызывает метод changed() при смене состояния агента.

Фрагмент реализации state_watcher_t::changed

void state_watcher_t::changed( so_5::agent_t &, const so_5::state_t & state ) { const auto detect_label = []( const std::string & name ) {...}; const char state_label = detect_label( state.query_name() ); if( '?' == state_label ) return; so_5::send< trace::state_changed_t >( m_mbox, m_index, state_label );
}

Посмотрим на те же самые решения (no_waiter_dijkstra, no_waiter_simple, waiter_with_timestamps) при реализации которых применяются std::thread и SObjectizer-овские mchain-ы (которые, по сути, есть CSP-шные каналы). Теперь мы поговорим о реализациях, которые не используют акторов вообще. Причем, подчеркну особо, в CSP-шных решениях используется тот же самый набор сообщений (все те же take_t, taken_t, busy_t, put_t).

Поэтому каждая вилка, каждый философ и каждый официант реализуется отдельным объектом std::thread. В CSP-подходе вместо "процессов" используются нити ОС.

Решение Дейкстры

Исходный код этого решения можно увидеть здесь.

Нить для вилки

Что реализуется функцией fork_process следующего вида: Нить для вилки в решении Дейкстры работает очень просто: цикл чтения сообщений из входного канала + обработка сообщений типа take_t и put_t.

void fork_process( so_5::mchain_t fork_ch )
{ // Состояние вилки: занята или нет. bool taken = false; // Очередь ждущих философов. std::queue< so_5::mbox_t > wait_queue; // Читаем и обрабатываем сообщения пока канал не закроют. so_5::receive( so_5::from( fork_ch ), [&]( so_5::mhood_t<take_t> cmd ) { if( taken ) // Вилка занята, философ должен ждать в очереди. wait_queue.push( cmd->m_who ); else { // Философ может взять вилку. taken = true; so_5::send< taken_t >( cmd->m_who ); } }, [&]( so_5::mhood_t<put_t> ) { if( wait_queue.empty() ) taken = false; // Вилка больше никому не нужна. else { // Вилку нужно отдать первому философу из очереди. const auto who = wait_queue.front(); wait_queue.pop(); so_5::send< taken_t >( who ); } } );
}

У функции fork_process всего один аргумент: входной канал, который создается где-то в другом месте.

Этот цикл реализуется всего одним вызовом функции receive(): Самое интересное в fork_process — это "цикл" выборки сообщений из канала до тех пор, пока канал не будет закрыт.

so_5::receive( so_5::from( fork_ch ), [&]( so_5::mhood_t<take_t> cmd ) {...}, [&]( so_5::mhood_t<put_t> ) {...} );

Эта версия читает сообщения из канала пока канал не будет закрыт. В SObjectizer-е есть несколько версий функции receive() и здесь мы видим одну из них. Если обработчик найден, то он вызывается и сообщение обрабатывается. Для каждого прочитанного сообщения ищется обработчик. Если не найден, то сообщение просто выбрасывается.

Эти лямбды выглядят как близнецы братья соответствующих обработчиков в акторе fork_t из решения на базе Акторов. Обработчики сообщений задаются в виде лямбда-функций. Что, в принципе, не удивительно.

Нить для философа

У этой функции достаточно объемный код, поэтому мы будем разбираться с ним по частям. Логика работы философа реализована в функции philosopher_process.

Полный код philosopher_process

oid philosopher_process( trace_maker_t & tracer, so_5::mchain_t control_ch, std::size_t philosopher_index, so_5::mbox_t left_fork, so_5::mbox_t right_fork, int meals_count )
{ int meals_eaten{ 0 }; random_pause_generator_t pause_generator; // Этот канал будет использован для получения ответов от вилок. auto self_ch = so_5::create_mchain( control_ch->environment() ); while( meals_eaten < meals_count ) { tracer.thinking_started( philosopher_index, thinking_type_t::normal ); // Имитируем размышления приостанавливая нить. std::this_thread::sleep_for( pause_generator.think_pause( thinking_type_t::normal ) ); // Пытаемся взять левую вилку. tracer.take_left_attempt( philosopher_index ); so_5::send< take_t >( left_fork, self_ch->as_mbox(), philosopher_index ); // Запрос отправлен, ждем ответа. so_5::receive( so_5::from( self_ch ).handle_n( 1u ), [&]( so_5::mhood_t<taken_t> ) { // Взяли левую вилку, пытаемся взять правую. tracer.take_right_attempt( philosopher_index ); so_5::send< take_t >( right_fork, self_ch->as_mbox(), philosopher_index ); // Запрос отправлен, ждем ответа. so_5::receive( so_5::from( self_ch ).handle_n( 1u ), [&]( so_5::mhood_t<taken_t> ) { // У нас обе вилки. Можно поесть. tracer.eating_started( philosopher_index ); // Имитируем поглощение пищи приостанавливая нить. std::this_thread::sleep_for( pause_generator.eat_pause() ); // На шаг ближе к финалу. ++meals_eaten; // После еды возвращаем правую вилку. so_5::send< put_t >( right_fork ); } ); // А следом возвращаем и левую. so_5::send< put_t >( left_fork ); } ); } // Сообщаем о том, что мы закончили. tracer.philosopher_done( philosopher_index ); so_5::send< philosopher_done_t >( control_ch, philosopher_index );
}

Давайте начнем с прототипа функции:

void philosopher_process( trace_maker_t & tracer, so_5::mchain_t control_ch, std::size_t philosopher_index, so_5::mbox_t left_fork, so_5::mbox_t right_fork, int meals_count )

Смысл и назначение некоторых из этих параметров придется пояснить.

Поэтому в коде философа приходится делать вот такие вставки: Поскольку мы не используем SObjectizer-овских агентов, то у нас нет возможности снимать след работы философа через слушателя состояний агента, как это делалось в варианте на Actor-ах.

tracer.thinking_started( philosopher_index, thinking_type_t::normal );

И аргумент tracer как раз является ссылкой на объект, который занимается трассировкой работы философов.

Этот канал затем будет использоваться для определения момента завершения работы всех философов. Аргумент control_ch задает канал, в который должно быть записано сообщение philosopher_done_t после того, как философ съест все, что ему положено.

Именно в эти каналы будут отсылаться сообщения take_t и put_t. Аргументы left_fork и right_fork задают каналы для взаимодействия с вилками. Но если это каналы, то почему используется тип mbox_t вместо mchain_t?

Но ответ на него мы увидим ниже, при обсуждении другого решения. Это хороший вопрос! Пока же можно сказать, что mchain — это что-то вроде разновидности mbox-а, поэтому ссылки на mchain-ы можно передавать через объекты mbox_t.

Далее определяется ряд переменных, которые формируют состояние философа:

int meals_eaten{ 0 }; random_pause_generator_t pause_generator; auto self_ch = so_5::create_mchain( control_ch->environment() );

Это персональный канал философа, через который философ будет получать ответные сообщения от вилок. Наверное наиболее важная переменная — это self_ch.

Т.е. Ну а теперь мы можем перейти к основной логике работы философа. к циклу повторения таких операций как размышления, захват вилок и поглощение пищи.

Можно отметить, что в отличии от решения на базе Акторов, для имитации длительных операций здесь используется this_thread::sleep_for вместо отложенных сообщений.

Попытка взять вилку выглядит практически так же, как и в случае с Акторами:

so_5::send< take_t >( left_fork, self_ch->as_mbox(), philosopher_index );

Но в нем есть поле типа mbox_t, тогда как self_ch имеет тип mchain_t. Здесь используется все тот же тип take_t. Поэтому приходится преобразовывать ссылку на канал в ссылку на почтовый ящик через вызов as_mbox().

Далее можно увидеть вызов receive():

so_5::receive( so_5::from( self_ch ).handle_n( 1u ), [&]( so_5::mhood_t<taken_t> ) {...} );

Ну или если канал будет закрыт. Этот вызов возвращает управление только когда один экземпляр taken_t будет извлечен и обработан. В общем, мы здесь ждем поступление нужного нам ответа от вилки.

Хотя стоит заострить внимание на вложенном вызове receive() для одного и того же канала: В общем-то это практически все, что можно было бы рассказать про philosopher_process.

so_5::receive( so_5::from( self_ch ).handle_n( 1u ), [&]( so_5::mhood_t<taken_t> ) { ... so_5::receive( so_5::from( self_ch ).handle_n( 1u ), [&]( so_5::mhood_t<taken_t> ) {...} ); ... } );

Эта вложенность позволяет записать логику работы философа в простом и более-менее компактном виде.

Функция запуска симуляции

Но вот в случае с CSP-подходом на run_simulation() имеет смысл остановиться. При обсуждении решений на базе Акторов мы не останавливались на разборе функций run_simulation(), поскольку там ничего особо интересного или важного не было. В том числе для того, чтобы в очередной раз убедиться в том, насколько непросто писать корректный многопоточный код (кстати говоря вне зависимости от языка программирования).

Полный код функции run_simulation

void run_simulation( so_5::environment_t & env, const names_holder_t & names ) noexcept
{ const auto table_size = names.size(); const auto join_all = []( std::vector<std::thread> & threads ) { for( auto & t : threads ) t.join(); }; trace_maker_t tracer{ env, names, random_pause_generator_t::trace_step() }; // Создаем вилки. std::vector< so_5::mchain_t > fork_chains; std::vector< std::thread > fork_threads( table_size ); for( std::size_t i{}; i != table_size; ++i ) { // Персональный канал для очередной вилки. fork_chains.emplace_back( so_5::create_mchain(env) ); // Рабочая нить для очередной вилки. fork_threads[ i ] = std::thread{ fork_process, fork_chains.back() }; } // Канал для получения уведомлений от философов. auto control_ch = so_5::create_mchain( env ); // Создаем философов. const auto philosopher_maker = [&](auto index, auto left_fork_idx, auto right_fork_idx) { return std::thread{ philosopher_process, std::ref(tracer), control_ch, index, fork_chains[ left_fork_idx ]->as_mbox(), fork_chains[ right_fork_idx ]->as_mbox(), default_meals_count }; }; std::vector< std::thread > philosopher_threads( table_size ); for( std::size_t i{}; i != table_size - 1u; ++i ) { // Запускаем очередного философа на отдельной нити. philosopher_threads[ i ] = philosopher_maker( i, i, i+1u ); } // Последний философ должен захватывать вилки в обратном порядке. philosopher_threads[ table_size - 1u ] = philosopher_maker( table_size - 1u, table_size - 1u, 0u ); // Ждем пока все философы закончат. so_5::receive( so_5::from( control_ch ).handle_n( table_size ), [&names]( so_5::mhood_t<philosopher_done_t> cmd ) { fmt::print( "{}: done\n", names[ cmd->m_philosopher_index ] ); } ); // Дожидаемся завершения рабочих нитей философов. join_all( philosopher_threads ); // Принудительно закрываем каналы для вилок. for( auto & ch : fork_chains ) so_5::close_drop_content( ch ); // После чего дожидаемся завершения рабочих нитей вилок. join_all( fork_threads ); // Показываем результат. tracer.done(); // И останавливаем SObjectizer. env.stop();
}

Поэтому я разберу только некоторые моменты. В принципе, в основном код run_simulation() должен быть более-менее понятен.

Этот фрагмент как раз отвечает за их создание: Нам требуются рабочие нити для вилок.

std::vector< so_5::mchain_t > fork_chains;
std::vector< std::thread > fork_threads( table_size ); for( std::size_t i{}; i != table_size; ++i )
{ fork_chains.emplace_back( so_5::create_mchain(env) ); fork_threads[ i ] = std::thread{ fork_process, fork_chains.back() };
}

Каналы потребуются ниже для передачи ссылок на них философам. При этом нам нужно сохранять и каналы, созданные для вилок, и сами рабочие нити. А рабочие нити потребуются для того, чтобы затем вызвать join для них.

нам нужно будет затем вызывать join: После чего мы создаем рабочие нити для философов и так же собираем рабочие нити в контейнер, т.к.

std::vector< std::thread > philosopher_threads( table_size );
for( std::size_t i{}; i != table_size - 1u; ++i )
{ philosopher_threads[ i ] = philosopher_maker( i, i, i+1u );
}
philosopher_threads[ table_size - 1u ] = philosopher_maker( table_size - 1u, table_size - 1u, 0u );

Дожидаемся момента завершения симуляции с помощью этого фрагмента: Далее мы должны дать философам некоторое время для выполнения симуляции.

so_5::receive( so_5::from( control_ch ).handle_n( table_size ), [&names]( so_5::mhood_t<philosopher_done_t> cmd ) { fmt::print( "{}: done\n", names[ cmd->m_philosopher_index ] ); } );

Этот вариант receive() возвращает управление только после получения table_size сообщений типа philosopher_done_t.

Ну а после получения всех philosopher_done_t остается выполнить очистку ресурсов.

Делаем join для всех рабочих нитей философов:

join_all( philosopher_threads );

Но просто вызывать join нельзя, т.к. После чего нужно сделать join для всех нитей вилок. Ведь рабочие нити вилок спят внутри вызовов receive() и никто их не разбудит. тогда мы тупо зависнем. Поэтому нам нужно сперва закрыть все каналы для вилок и лишь затем вызывать join:

for( auto & ch : fork_chains ) so_5::close_drop_content( ch ); join_all( fork_threads );

Вот теперь главные операции по очистке ресурсов можно считать законченными.

Несколько слов о noexcept

Дело в том, что в ней exception-safety не обеспечивается от слова совсем. Надеюсь, что код run_simulation сейчас полностью понятен и я могу попробовать объяснить, почему эта функция помечена как noexcept. Поэтому самая лучшая реакция на возникновение исключения в такой тривиальной реализации — это принудительное завершение всего приложения.

Но почему run_simulation не обеспечивает безопасность по отношению к исключениям?

Дабы обеспечить хоть какую-нибудь exception-safety нам нужно принудительно завершить те нити, которые уже были запущены. Давайте представим, что мы создаем рабочие нити для вилок и при создании очередной нити у нас выскакивает исключение. И кто-то может подумать, что для этого достаточно переписать код запуска нитей для вилок следующим образом:

try
{ for( std::size_t i{}; i != table_size; ++i ) { fork_chains.emplace_back( so_5::create_mchain(env) ); fork_threads[ i ] = std::thread{ fork_process, fork_chains.back() }; }
}
catch( ... )
{ for( std::size_t i{}; i != fork_chains.size(); ++i ) { so_5::close_drop_content( fork_chains[ i ] ); if( fork_threads[ i ].joinable() ) fork_threads[ i ].join(); } throw;
}

Т.к. К сожалению, это очевидное и неправильное решение. Поэтому лучше использовать что-то вроде вот такого: оно окажется бесполезным если исключение возникнет уже после того, как мы создадим все нити для вилок.

struct fork_threads_stuff_t { std::vector< so_5::mchain_t > m_fork_chains; std::vector< std::thread > m_fork_threads; fork_threads_stuff_t( std::size_t table_size ) : m_fork_threads( table_size ) {} ~fork_threads_stuff_t() { for( std::size_t i{}; i != m_fork_chains.size(); ++i ) { so_5::close_drop_content( m_fork_chains[ i ] ); if( m_fork_threads[ i ].joinable() ) m_fork_threads[ i ].join(); } } void run() { for( std::size_t i{}; i != m_fork_threads.size(); ++i ) { m_fork_chains.emplace_back( so_5::create_mchain(env) ); m_fork_threads[ i ] = std::thread{ fork_process, m_fork_chains.back() }; } }
} fork_threads_stuff{ table_size }; // Преаллоцируем нужные ресурсы.
fork_threads_stuff.run(); // Создаем каналы и запускаем нити. // Вся нужная очистка произойдет в деструкторе fork_threads_stuff.

Ну или можно воспользоваться трюками, которые позволяют выполнять нужный нам код при выходе из скоупа (например, Boost-овским ScopeExit-ом, GSL-овским finally() и им подобным).

И решать ее нужно будет подобным образом. Аналогичная проблема существует и с запуском нитей для философов.

Что не есть хорошо для функции, написанной исключительно в демонстрационных целях и не претендующей на продакшен-качество. Однако, если поместить весь необходимый код по обеспечению exception-safety в run_simulation(), то код run_simulation() окажется и объемнее, и сложнее в восприятии. ИМХО, для такого рода демонстрационных примеров это вполне себе разумный вариант. Поэтому я решил забить на обеспечение exception-safety внутри run_simulation() и пометил функцию как noexcept, что приведет к вызову std::terminate в случае возникновения исключения.

Неопытный программист может посчитать, что достаточно только вызвать join, хотя на самом деле нужно еще предварительно закрыть каналы перед вызовом join. Тем не менее, проблема в том, что нужно иметь соответствующий опыт чтобы понять, какой код по очистке ресурсов будет работать, а какой нет. И такую проблему в коде будет очень непросто выловить.

Простое решение без использования арбитра (официанта)

Теперь мы можем рассмотреть, как в CSP-подходе будет выглядеть простое решение с возвратом вилок при неудачном захвате, но без арбитра.

Исходный код этого решения можно найти здесь.

Нить для вилки

Нить для вилки реализуется функцией fork_process, которая выглядит следующим образом:

void fork_process( so_5::mchain_t fork_ch )
{ // Состояние вилки: захвачена или свободна. bool taken = false; // Обрабатываем сообщения пока канал не закроют. so_5::receive( so_5::from( fork_ch ), [&]( so_5::mhood_t<take_t> cmd ) { if( taken ) so_5::send< busy_t >( cmd->m_who ); else { taken = true; so_5::send< taken_t >( cmd->m_who ); } }, [&]( so_5::mhood_t<put_t> ) { if( taken ) taken = false; } );
}

Можно увидеть, что эта fork_process проще, чем аналогичная в решении Дейкстры (ту же самую картину мы могли наблюдать, когда рассматривали решения на базе Акторов).

Нить для философа

Нить для философа реализуется функцией philosopher_process, которая оказывается несколько сложнее, чем ее аналог в решении Дейкстры.

Полный код philosopher_process

void philosopher_process( trace_maker_t & tracer, so_5::mchain_t control_ch, std::size_t philosopher_index, so_5::mbox_t left_fork, so_5::mbox_t right_fork, int meals_count )
{ int meals_eaten{ 0 }; // Этот флаг потребуется для трассировки действий философа. thinking_type_t thinking_type{ thinking_type_t::normal }; random_pause_generator_t pause_generator; // Канал для получения ответов от вилок. auto self_ch = so_5::create_mchain( control_ch->environment() ); while( meals_eaten < meals_count ) { tracer.thinking_started( philosopher_index, thinking_type ); // Имитируем размышления приостанавливая нить. std::this_thread::sleep_for( pause_generator.think_pause( thinking_type ) ); // На случай, если захватить вилки не получится. thinking_type = thinking_type_t::hungry; // Пытаемся взять левую вилку. tracer.take_left_attempt( philosopher_index ); so_5::send< take_t >( left_fork, self_ch->as_mbox(), philosopher_index ); // Запрос отправлен, ждем ответа. so_5::receive( so_5::from( self_ch ).handle_n( 1u ), []( so_5::mhood_t<busy_t> ) { /* ничего не нужно делать */ }, [&]( so_5::mhood_t<taken_t> ) { // Левая вилка взята, попробуем взять правую. tracer.take_right_attempt( philosopher_index ); so_5::send< take_t >( right_fork, self_ch->as_mbox(), philosopher_index ); // Запрос отправлен, ждем ответа. so_5::receive( so_5::from( self_ch ).handle_n( 1u ), []( so_5::mhood_t<busy_t> ) { /* ничего не нужно делать */ }, [&]( so_5::mhood_t<taken_t> ) { // У нас обе вилки, можно поесть. tracer.eating_started( philosopher_index ); // Имитируем поглощение пищи приостанавливая нить. std::this_thread::sleep_for( pause_generator.eat_pause() ); // На шаг ближе к финалу. ++meals_eaten; // После еды нужно вернуть правую вилку. so_5::send< put_t >( right_fork ); // Следующий период размышлений должен быть помечен как "normal". thinking_type = thinking_type_t::normal; } ); // Левую вилку нужно вернуть в любом случае. so_5::send< put_t >( left_fork ); } ); } // Уведомляем о завершении работы. tracer.philosopher_done( philosopher_index ); so_5::send< philosopher_done_t >( control_ch, philosopher_index );
}

Но есть два важных отличия. В общем-то код philosopher_process очень похож на код philosopher_process из решения Дейкстры.

Она нужна для того, чтобы формировать правильный след работы философа, а так же для того, чтобы вычислять паузы при имитации "раздумий" философа. Во-первых, это переменная thinking_type.

Мы их можем увидеть при вызове receive(): Во-вторых, это обработчики для сообщений busy_t.

so_5::receive( so_5::from( self_ch ).handle_n( 1u ), []( so_5::mhood_t<busy_t> ) { /* ничего не нужно делать */ }, [&]( so_5::mhood_t<taken_t> ) { ... so_5::receive( so_5::from( self_ch ).handle_n( 1u ), []( so_5::mhood_t<busy_t> ) { /* ничего не нужно делать */ }, [&]( so_5::mhood_t<taken_t> ) {...} );

Поэтому при получении busy_t ничего не нужно делать. Да, обработчики для busy_t пусты, но это потому, что все необходимые действия либо уже были сделаны перед вызовом receive(), либо будут сделаны после выхода из receive(). их присутствие запрещает receive() выбрасывать экземпляры busy_t без обработки. Но сами обработчики должны быть определены, т.к. Благодаря присутствию таких обработчиков receive() возвращает управление когда в канал приходит сообщение busy_t.

Решение с официантом и временными отметками

Разбирая решения на базе Акторов речь шла о решениях с арбитром (официантом): мы рассматривали решение waiter_with_queue, в котором официант использует очередь заявок, а так же упоминалось решение waiter_with_timestamps. На базе CSP-подхода было сделано еще одно решение, которое я бы хотел здесь кратко осветить. Оба эти решения использовали один и тот же трюк: официант создавал набор mbox-ов для несуществующих вилок, эти mbox-ы раздавались философам, но сообщения из mbox-ов обрабатывались официантом.

Но может ли официант создать набор mchain-ов которые будут использоваться философами и из которых официант будет читать сообщения, адресованные вилкам? Похожий трюк нужен и в CSP-подходе для того, чтобы я смог переиспользовать уже существующую реализацию philosopher_process из решения no_waiter_simple.

К сожалению, нет.

Проблема в том, чтобы читать сообщения сразу из нескольких mchain-ов. Создать набор mchain-ов не проблема.

В SObjectizer-е есть функция select(), которая позволяет это делать, например, она позволяет написать так:

so_5::select( so_5::from_all(), case_(ch1, one_handler_1, one_handler_2, one_handler_3, ...), case_(ch2, two_handler_1, two_handler_2, two_handler_3, ...), ...);

Тогда как в моих решениях задачи "обедающих философов" этот список становится известен только во время исполнения. Но для select() нужно, чтобы список каналов и обработчиков сообщений из них был доступен в компайл-тайм. Поэтому в CSP-подходе нельзя в чистом виде переиспользовать трюк из подхода на базе Акторов.

Но мы можем его переосмыслить.

И поэтому нам нужно как-то эти индексы получить. Итак, суть в том, что в исходных сообщениях take_t и put_t нет поля для хранения индекса вилки. Раз уж мы не можем запихнуть индексы внутрь take_t и put_t, то давайте сделаем расширенные версии этих сообщений:

struct extended_take_t final : public so_5::message_t
{ const so_5::mbox_t m_who; const std::size_t m_philosopher_index; const std::size_t m_fork_index; extended_take_t( so_5::mbox_t who, std::size_t philosopher_index, std::size_t fork_index ) : m_who{ std::move(who) } , m_philosopher_index{ philosopher_index } , m_fork_index{ fork_index } {}
}; struct extended_put_t final : public so_5::message_t
{ const std::size_t m_fork_index; extended_put_t( std::size_t fork_index ) : m_fork_index{ fork_index } {}
};

В данном же случае наследование используется просто для того, чтобы продемонстрировать такой способ определения SObjectizer-овских сообщений. Вообще говоря, нет надобности наследовать типы сообщений от so_5::message_t, хотя я обычно как раз использую такое наследование (у него есть некоторые небольшие бенефиты).

Значит нам нужно научиться перехватывать сообщения take_t и put_t, преобразовывать их в extended_take_t и extended_put_t, отсылать новые сообщения официанту. Теперь нужно сделать так, чтобы официант читал именно расширенные версии сообщений вместо оригинальных.

Всего лишь 🙂 Для этого нам потребуется собственный mbox.

Код собственного mbox-а

class wrapping_mbox_t final : public so_5::extra::mboxes::proxy::simple_t
{ using base_type_t = so_5::extra::mboxes::proxy::simple_t; // Куда сообщения должны доставляться. const so_5::mbox_t m_target; // Индекс вилки, который должен использоваться в новых сообщениях. const std::size_t m_fork_index; // Типы сообщений для перехвата. static std::type_index original_take_type; static std::type_index original_put_type; public : wrapping_mbox_t( const so_5::mbox_t & target, std::size_t fork_index ) : base_type_t{ target } , m_target{ target } , m_fork_index{ fork_index } {} // Это основной метод so_5::abstract_message_box_t для доставки сообщений. // Переопределяем его для перехвата и преобразования сообщений. void do_deliver_message( const std::type_index & msg_type, const so_5::message_ref_t & message, unsigned int overlimit_reaction_deep ) const override { if( original_take_type == msg_type ) { // Получаем доступ к исходному сообщению. const auto & original_msg = so_5::message_payload_type<::take_t>:: payload_reference( *message ); // Шлем новое сообщение вместо старого. so_5::send< extended_take_t >( m_target, original_msg.m_who, original_msg.m_philosopher_index, m_fork_index ); } else if( original_put_type == msg_type ) { // Шлем новое сообщение вместо старого. so_5::send< extended_put_t >( m_target, m_fork_index ); } else base_type_t::do_deliver_message( msg_type, message, overlimit_reaction_deep ); } // Фабричный метод чтобы было проще создавать wrapping_mbox_t. static auto make( const so_5::mbox_t & target, std::size_t fork_index ) { return so_5::mbox_t{ new wrapping_mbox_t{ target, fork_index } }; }
}; std::type_index wrapping_mbox_t::original_take_type{ typeid(::take_t) };
std::type_index wrapping_mbox_t::original_put_type{ typeid(::put_t) };

Без использования этой заготовки мне пришлось бы наследоваться напрямую от so_5::abstract_message_box_t и реализовывать ряд чистых виртуальных методов. Здесь использовался самый простой способ создания собственного mbox-а: в сопутствующем проекте so_5_extra есть заготовка, которую можно переиспользовать и сохранить себе кучу времени.

Так что мы теперь можем создать набор экземпляров этого класса, ссылки на которые и будут раздаваться философам. Как бы то ни было, теперь есть класс wrapping_mbox_t. Поэтому функция waiter_process, которая является главной функцией нити официанта, будет иметь вот такой простой вид: Философы будут отсылать сообщения в wrapping_mbox, а эти сообщения будут преобразовываться и переадресовываться в единственный mchain официанта.

void waiter_process( so_5::mchain_t waiter_ch, details::waiter_logic_t & logic )
{ // Получаем и обрабатываем сообщения пока канал не закроют. so_5::receive( so_5::from( waiter_ch ), [&]( so_5::mhood_t<details::extended_take_t> cmd ) { logic.on_take_fork( std::move(cmd) ); }, [&]( so_5::mhood_t<details::extended_put_t> cmd ) { logic.on_put_fork( std::move(cmd) ); } );
}

Интересующиеся могут посмотреть код решения waiter_with_timestamps здесь. Конечно же, прикладная логика официанта реализована в другом месте и ее код не так прост и короток, но мы не будем туда погружаться.

Вот сейчас мы можем ответить на вопрос: "Почему каналы для вилок передаются в philosopher_process как mbox-ы?" Это потому, что для решения waiter_with_timestamps был реализован собственный mbox, а не mchain.

Но это потребовало бы несколько больше работы, т.к. Конечно же, можно было бы создать и собственный mchain. Так что для экономии времени я просто остановился на mbox-ах вместо mchain-ов. в so_5_extra пока нет такой же заготовки для собственных mchain-ов (может быть со временем появится).

Свою задачу я видел не в том, чтобы сделать максимально эффективные решения. Вот, пожалуй, и все, что хотелось рассказать про реализованные на базе Акторов и CSP решения. Надеюсь, что кому-то это было интересно. А в том, чтобы показать, как именно они могут выглядеть.

Все идет к тому, что в ближайшее время начнется работа над следующей "большой" версией SObjectizer — веткой 5. Позволю себе отвлечь внимание тех, кто интересуется SObjectizer-ом. 5. 6, нарушающей совместимость с веткой 5. Более-менее актуальный список того, что поменяется в SO-5. Желающие сказать свое веское слово по этому поводу, могут сделать это здесь (или здесь). 6 можно найти здесь (туда же можно добавить и свои пожелания).

На этом у меня все, большое спасибо всем читателям за потраченное на данную статью время!

Слово "современные" в заголовке взято в кавычки потому, что в самих решениях нет ничего современного. PS. Разве что за исключением использования кода на C++14.

Показать больше

Похожие публикации

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»