Хабрахабр

Сытые философы или конкурентное программирование на .NET

Net, на примере проблемы обедающих философов. Давайте посмотрим как устроено конкурентное и параллельное программирование в . Статья может быть полезна для первого знакомства или для того, чтобы освежить свои знания. План такой, от синхронизации потоков/процессов, до модели акторов (в следующих частях).

Транзисторы достигают своего минимального размера, закон Мура упирается в ограничение скорости света и поэтому рост наблюдается в количестве, транзисторов можно делать больше. Зачем вообще уметь это? В такой ситуации "обычное" программирование, когда у нас один выполняющий поток, уже не эффективно. При этом количество данных растет, а пользователи ожидают немедленной реакции систем. Причем, проблема эта существует на разных уровнях: на уровне потоков, на уровне процессов, на уровне машин в сети (распределенные системы). Нужно как-то решать проблему одновременного или конкурентного выполнения. NET есть качественные, проверенные временем, технологии для быстрого и эффективного решения таких задач. В .

Задача

Устоявшаяся формулировка такая. Эдсгер Дейкстра задавал эту проблему своим ученикам еще в 1965. Они сидят за круглым столом, вилки между ними. Есть некоторое (обычно пять) количество философов и столько же вилок. Чтобы поесть философу, нужно взять две вилки (последний делит вилку с первым). Философы могут есть из своих тарелок с бесконечной пищей, думать или ждать. Все философы безмолвные. Взять и положить вилку — два раздельных действия. Задача найти такой алгоритм, чтобы все они думали и были сыты спустя даже 54 года.

Вилки лежат на общем столе и философы просто их берут, когда они есть, и кладут обратно. Сначала попробуем решить эту задачу через использование разделяемого места. что делать если вилки нет? Здесь появляются проблемы с синхронизацией, когда именно брать вилки? Но сначала давайте запустим философов. и др.

Run метод: Для запуска потоков используем пул потоков через Task.

var cancelTokenSource = new CancellationTokenSource();
Action<int> create = (i) => RunPhilosopher(i, cancelTokenSource.Token);
for (int i = 0; i < philosophersAmount; i++) { int icopy = i; // Поместить задачу в очередь пула потоков. Метод RunDeadlock не запускаеться // сразу, а ждет своего потока. Асинхронный запуск. philosophers[i] = Task.Run(() => create(icopy), cancelTokenSource.Token);
}

У этого пула есть очередь с задачами и CLR создает или удаляет потоки в зависимости от количества этих задач. Пул потоков создан для оптимизации создания и удаления потоков. Этот пул стоит использовать почти всегда, т.к. Один пул на все AppDomain'ы. Можно и без пула, но тогда придется напрямую использовать Thread, это целесообразно для случаев, когда нужно поменять приоритет потоку, когда у нас долгая операция, для Foreground потока и др. не нужно заморачиваться с созданием, удалением потоков, их очередями и пр.

Threading. Другими словами, System. Task класс — это тот же Thread, но со всякими удобствами: возможность запускать таск после блока других тасков, возвращать их из функций, удобно их прерывать и мн. Tasks. Они нужны для поддержки async/await конструкций (Task-based Asynchronous Pattern, синтаксический сахар для ожидания IO операции). др. Об этом еще поговорим.

CancelationTokenSource здесь нужен, чтобы поток мог сам завершится по сигналу вызывающего потока.

Проблемы с синхронизацией

Блокированные философы

Хорошо, мы умеем создавать потоки, давайте попробуем пообедать:

// Кто какие вилки взял. К примеру: 1 1 3 3 - 1й и 3й взяли первые две пары.
private int[] forks = Enumerable.Repeat(0, philosophersAmount).ToArray(); // То же, что RunPhilosopher()
private void RunDeadlock(int i, CancellationToken token)
}

Взятие одной вилки атомарно, т.е. Здесь мы сначала пробуем взять левую, а потом правую вилки и если получилось, то едим и кладем их обратно. Для этого Interlocked. два потока не могут взять одну одновременно (неверно: первый читает, что вилка свободна, второй — тоже, первый берет, второй берет). А SpinWait эквивалентно конструкции while(true) только с небольшой "магией" — поток занимает процессор (Thread. CompareExchange, который должен быть реализован с помощью инструкции процессора (TSL, XCHG), которая блокирует участок памяти для атомарного последовательного чтения и записи. Yeild) или засыпает (Thread. SpinWait), но иногда передает управление другому потоку (Thread. Sleep).

потоки скоро (у меня в течении секунды) блокируются: все философы берут свою левую вилку, а правой нет. Но это решение не работает, т.к. Массив forks тогда имеет значения: 1 2 3 4 5.

Livelock

Зеленым цветом — выполнение, красным — синхронизация, серым — поток спит. На рисунке, блокирование потоков (deadlock). Ромбиками обозначено время запуска Task'ов.

Голод философов

Попробуем смоделировать ситуацию голодания потоков в нашей задаче. Хотя чтобы мыслить особенно много еды не нужно, но голод кого угодно заставить бросить философию. Для того, чтобы избежать частую блокировку будем класть вилку назад, если не смогли взять другую. Голодание — это когда поток работает, но без существенной работы, другими словами это тот же дедлок, только теперь поток не спит, а активно ищет как бы поесть, но еды нет.

// То же что и в RunDeadlock, но теперь кладем вилку назад и добавляем плохих философов.
private void RunStarvation(int i, CancellationToken token)
{ while (true) { bool hasTwoForks = false; var waitTime = TimeSpan.FromMilliseconds(50); // Плохой философов может уже иметь вилку: bool hasLeft = forks[Left(i)] == i + 1; if (hasLeft || TakeFork(Left(i), i + 1, waitTime)) { if (TakeFork(Right(i), i + 1, TimeSpan.Zero)) hasTwoForks = true; else PutFork(Left(i)); // Иногда плохой философ отдает вилку назад. } if (!hasTwoForks) { if (token.IsCancellationRequested) break; continue; } eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1); bool goodPhilosopher = i % 2 == 0; // А плохой философ забывает положить свою вилку обратно: if (goodPhilosopher) PutFork(Left(i)); // А если и правую не положит, то хорошие будут вообще без еды. PutFork(Right(i)); Think(i); if (token.IsCancellationRequested) break; }
} // Теперь можно ждать определенное время.
bool TakeFork(int fork, int philosopher, TimeSpan? waitTime = null)
{ return SpinWait.SpinUntil( () => Interlocked.CompareExchange(ref forks[fork], philosopher, 0) == 0, waitTime ?? TimeSpan.FromMilliseconds(-1) );
}

И получается, что они едят больше еды, а другие начинают голодать, хотя у потоков одинаковый приоритет. В этом коде важно то, что два из четырех философа забывают положить свою левую вилку. плохие философы кладут свои вилки иногда назад. Здесь они не совсем голодают, т.к. Так небольшая ошибка в коде приводит к тому, что падает производительность. У меня получается, что хорошие едят где-то в 5 раз меньше, чем плохие. Эта ситуация тоже голодание, больше похожая на взаимную блокировку. Здесь еще стоит заметить, что возможна редкая ситуация, когда все философы берут левую вилку, правой нет, они кладут левую, ждут, опять берут левую и т.д. Ниже картинка для ситуации, когда два плохих философа забрали обе вилки, а два хороших голодают. Повторить ее у меня не получилось.

Starvation

Два ядра из четырех ничего не делают (зеленый график вверху). Здесь видно, что потоки просыпаются иногда и пробуют получить ресурс.

Смерть философа

Тогда соседи останутся без обеда. Ну и еще одна проблема, которая может прервать славный обед философов — это если один из них внезапно умрёт с вилками в руках (и его так и похоронят). И, между прочим, исключение будет не обработанным и вызывающий код его просто так не поймает (для этого AppDomain. Пример кода для этого случая вы можете придумать и сами, например выбрасывается NullReferenceException после того, как философ берет вилки. UnhandledException и др.). CurrentDomain. Поэтому обработчики ошибок необходимы в самих потоках и с корректным завершением.

Официант

Будем допускать только одного философа до вилок, добавим взаимное исключение (mutual exclusion) потоков для этого места. Хорошо, как нам решить эту проблему с взаимными блокировками, голоданием и смертями? Предположим, что рядом с философами стоит официант, который дает разрешение какому-нибудь одному философу взять вилки. Как это сделать? Как нам сделать этого официанта и как философы будут просить его, вопросы интересные.

Т.е. Простейший способ — это когда философы будут просто постоянно просить официанта дать доступ к вилкам. Сначала используем для этого только User Space, в нем мы не используем прерывания для вызова каких-нибудь процедур из ядра (о них ниже). теперь философы будут не ждать вилку рядом, а ждать или просить официанта.

Решения в пространстве пользователя

Но теперь это будут все философы и как бы только одна вилка, т.е. Здесь будем делать тоже, что раньше делали с одной вилкой и двумя философами, будем крутиться в цикле и ждать. Для этого используем SpinLock. можно сказать будет есть только тот философ, который взял эту "золотую вилку" у официанта.

private static SpinLock spinLock = new SpinLock(); // Наш "официант"
private void RunSpinLock(int i, CancellationToken token)
{ while (true) { // Взаимная блокировка через busy waiting. Вызываем до try, чтобы // выбрасить исключение в случае ошибки в самом SpinLock. bool hasLock = false; spinLock.Enter(ref hasLock); try { // Здесь может быть только один поток (mutual exclusion). forks[Left(i)] = i + 1; // Берем вилку сразу, без ожидания. forks[Right(i)] = i + 1; eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1); forks[Left(i)] = 0; forks[Right(i)] = 0; } finally { if(hasLock) spinLock.Exit(); // Избегаем проблемы со смертью философа. } Think(i); if (token.IsCancellationRequested) break; }
}

Теперь он умеет считать ожидающих, немного усыплять их и мн. SpinLock это блокировщик, с, грубо говоря, тем же while(true) { if (!lock) break; }, но с еще большей "магией", чем в SpinWait (который там используется). В общем, делает всё возможное для оптимизации. др. Поэтому используем его только для очень очень коротких изменений в общей памяти, без всяких сторонних вызовов, вложенных блокировок и пр. Но надо помнить, что это всё тот же активный цикл, который ест ресурсы процессора и держит поток, который может привести к голоданию, если один из философов становиться приоритетнее других, но не имеет золотой вилки (Priority Inversion problem). сюрпризов.

SpinLock

Потоки постоянно "воюют" за золотую вилку. Рисунок для SpinLock. Ядра используются не полностью: только около 2/3 этими четырьмя потоками. Случаются провалы — на рисунке выделенная область.

CompareExchange с тем же активным ожиданием, как показано в коде выше (в голодающих философах), но это, как было уже сказано, теоретически может привести к блокировке. Другое решение здесь было бы использовать только Interlocked.

А через повтор изменения в случае, если другой поток успевает внести свои изменения (чтение 1, чтение 2, запись 2, запись 1 плохая), он может использоваться для сложных изменений одного значения (Interlocked Anything паттерн). Про Interlocked стоит сказать, что там не только CompareExchange, но и другие методы для атомарного чтения И записи.

Решения в режиме ядра

Другими словами, продолжая наш пример, посмотрим, как официант усыпит философа и разбудит его только тогда, когда надо. Чтобы избежать потери ресурсов в цикле, посмотрим как можно блокировать поток. Все структуры там часто оказываются медленнее, чем те, что в пространстве пользователя. Сначала рассмотрим, как это сделать через режим ядра операционной системы. Но с их помощью можно синхронизировать процессы по всей системе, управляемые или нет. Медленнее в несколько раз, например AutoResetEvent может быть в 53 раза медленнее SpinLock [Рихтер].

Семафор это, упрощенно говоря, положительное целое число, управляемое системой, и две операции на нем, — увеличить и уменьшить. Основная конструкция здесь это семафор, предложенный Дейкстрой более полувека назад. Когда число увеличивается каким-нибудь другим активным потоком/процессом, тогда потоки пропускаются, а семафор опять уменьшается на число прошедших. Если уменьшить не получается, ноль, то вызывающий поток блокируется. . Можно представить поезда в узком месте с семафором. Мы будем использовать AutoResetEvent, это самая простая из этих конструкций: только два значения 0 и 1 (false, true). NET предлагает несколько конструкций с подобными функциями: AutoResetEvent, ManualResetEvent, Mutex и сам Semaphore. А метод Set() повышает до 1 и пропускает одного ожидающего, который опять понижает до 0. Ее метод WaitOne() блокирует вызывающий поток, если значение было 0, а если 1, то понижает до 0 и пропускает его. Действует, как турникет в метро.

Т.е. Усложним решение и будем использовать блокировку для каждого философа, а не для всех сразу. Но мы опять блокируем доступ к столу, для того чтобы корректно, избегая гонок (race conditions), взять вилки. теперь есть могут сразу несколько философов, а не один.

// Для блокирования отдельного философа.
// Инициализируется: new AutoResetEvent(true) для каждого.
private AutoResetEvent[] philosopherEvents; // Для доступа к вилкам / доступ к столу.
private AutoResetEvent tableEvent = new AutoResetEvent(true); // Рождение философа.
public void Run(int i, CancellationToken token)
{ while (true) { TakeForks(i); // Ждет вилки. // Обед. Может быть и дольше. eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1); PutForks(i); // Отдать вилки и разблокировать соседей. Think(i); if (token.IsCancellationRequested) break; }
} // Ожидать вилки в блокировке.
void TakeForks(int i)
{ bool hasForks = false; while (!hasForks) // Попробовать еще раз (блокировка не здесь). { // Исключающий доступ к столу, без гонок за вилками. tableEvent.WaitOne(); if (forks[Left(i)] == 0 && forks[Right(i)] == 0) forks[Left(i)] = forks[Right(i)] = i + 1; hasForks = forks[Left(i)] == i + 1 && forks[Right(i)] == i + 1; if (hasForks) // Теперь философ поест, выйдет из цикла. Если Set // вызван дважды, то значение true. philosopherEvents[i].Set(); // Разблокировать одного ожидающего. После него значение tableEvent в false. tableEvent.Set(); // Если имеет true, не блокируется, а если false, то будет ждать Set от соседа. philosopherEvents[i].WaitOne(); }
} // Отдать вилки и разблокировать соседей.
void PutForks(int i)
{ tableEvent.WaitOne(); // Без гонок за вилками. forks[Left(i)] = 0; // Пробудить левого, а потом и правого соседа, либо AutoResetEvent в true. philosopherEvents[LeftPhilosopher(i)].Set(); forks[Right(i)] = 0; philosopherEvents[RightPhilosopher(i)].Set(); tableEvent.Set();
}

Он ждет доступа к столу. Чтобы понять, что тут происходит, рассмотрим случай, когда философу не удалось взять вилки, тогда его действия будут такими. Не получилось. Получив его он пробует взять вилки. И проходит свой "турникет" (AutoResetEvent) (вначале они открыты). Он отдает доступ к столу (взаимное исключение). у него нет вилок. Попадает опять в цикл, т.к. Какой-нибудь более удачливый сосед справа или слева, закончив есть, разблокирует нашего философа, "открывая его турникет". Пробует взять их и останавливается у своего "турникета". Пробует в третий раз взять вилки. Наш философ проходит его (и он закрывается за ним) во второй раз. И проходит свой турникет, чтобы отобедать. Удачно.

Repeat), тогда философы будут ждать уже разработчиков, т.к. Когда в таком коде будут случайные ошибки (они всегда есть), например будет неверно указан сосед или создан один и тот же объект AutoResetEvent для всех (Enumerable. Ещё одна проблема с этим решением в том, что оно не гарантирует, что какой-нибудь философ не начнет голодать. поиск ошибок в таком коде довольно сложное занятие.

Гибридные решения

Первый метод хорош для кратких блокировок, второй для длительных. Мы рассмотрели два подхода к синхронизации, когда мы остаемся в режиме пользователя и крутимся в цикле и, когда мы блокируем поток через ядро. Этот подход реализован в т.н. Часто требуется сначала кратко ожидать изменения переменной в цикле, а потом заблокировать поток, когда ожидание долгое. Здесь есть те же конструкции, что были для режима ядра, но теперь с циклом в режиме пользователя: SemaphorSlim, ManualResetEventSlim и др. гибридных конструкциях. в C# есть известный всем lock синтаксис. Самая популярная конструкция здесь это Monitor, т.к. Посмотрим на решение с ним. Monitor это тот же семафор с максимальным значением 1 (мютекс), но с поддержкой ожидания в цикле, рекурсии, Condition Variable паттерна (о нем ниже) и др.

// Спрячем объект для Монитора от всех, чтобы без дедлоков.
private readonly object _lock = new object();
// Время ожидания потока.
private DateTime?[] _waitTimes = new DateTime?[philosophersAmount]; public void Run(int i, CancellationToken token)
{ while (true) { TakeForks(i); eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1); PutForks(i); Think(i); if (token.IsCancellationRequested) break; }
} // Наше сложное условие для Condition Variable паттерна.
bool CanIEat(int i)
{ // Если есть вилки: if (forks[Left(i)] != 0 && forks[Right(i)] != 0) return false; var now = DateTime.Now; // Может, если соседи не более голодные, чем текущий. foreach(var p in new int[] {LeftPhilosopher(i), RightPhilosopher(i)}) if (_waitTimes[p] != null && now - _waitTimes[p] > now - _waitTimes[i]) return false; return true;
} void TakeForks(int i)
{ // Зайти в Монитор. То же самое: lock(_lock) {..}. // Вызываем вне try, чтобы возможное исключение выбрасывалось выше. bool lockTaken = false; Monitor.Enter(_lock, ref lockTaken); try { _waitTimes[i] = DateTime.Now; // Condition Variable паттерн. Освобождаем лок, если не выполненно // сложное условие. И ждем пока кто-нибудь сделает Pulse / PulseAll. while (!CanIEat(i)) Monitor.Wait(_lock); forks[Left(i)] = i + 1; forks[Right(i)] = i + 1; _waitTimes[i] = null; } finally { if (lockTaken) Monitor.Exit(_lock); }
} void PutForks(int i)
{ // То же самое: lock (_lock) {..}. bool lockTaken = false; Monitor.Enter(_lock, ref lockTaken); try { forks[Left(i)] = 0; forks[Right(i)] = 0; // Освободить все потоки в очереди ПОСЛЕ вызова Monitor.Exit. Monitor.PulseAll(_lock); } finally { if (lockTaken) Monitor.Exit(_lock); }
}

Т.е. Здесь мы опять блокируем весь стол для доступа к вилкам, но теперь мы разблокируем все сразу потоки, а не соседей, когда кто-нибудь заканчивает есть. его время ожидания меньше. сначала, кто-нибудь ест и блокирует соседей, а когда этот кто-нибудь заканчивает, но хочет сразу опять есть, он уходит в блокировку и будит своих соседей, т.к.

Используем цикл для краткого ожидания и блокируем поток для долгого. Так мы избегаем дедлоков и голодания какого-нибудь философа. потоки должны остаться в режиме пользователя сначала. Разблокировка сразу всех работает медленнее, чем если бы была разблокировка только соседа, как в решении с AutoResetEvent, но разница не должна быть большой, т.к.

Рекомендуют использовать Monitor напрямую [Рихтер] [Эрик Липперт]. У lock синтаксиса есть неприятные сюрпризы. В таких случаях чаще лучше уходить в дедлок или как-то безопасно завершать программу. Один из них в том, что lock всегда выходит из Monitor, даже если было исключение, и тогда другой поток может изменить состояние общей памяти. Поэтому, если выбран неподходящий объект, можно легко получить дедлок (например, если сделать лок на интернированную строку). Другой сюрприз в том, что Monitor использует блоки синхронизации (SyncBlock), которые есть во всех объектах. Используем всегда скрытый объект для этого.

В . Condition Variable паттерн позволяет более кратко реализовать ожидание какого-нибудь сложного условия. по идее там должны быть несколько очередей на нескольких переменных (как в Posix Threads), а не на одном локе. NET он неполный, на мой взгляд, т.к. Но и в таком виде он позволяет сократить код. Тогда можно было бы сделать их для всех философов.

Много философов или async / await

Но, а если у нас становиться много философов? Хорошо, теперь мы умеем эффективно блокировать потоки. 10000? 100? Создавать для каждого запроса поток будет накладным, т.к. Например, нам пришло 100000 запросов на веб-сервер. Будут выполняться только столько, сколько логических ядер (у меня 4). столько потоков параллельно не будут выполняться. Одно из решений этой проблемы async / await паттерн. А все остальные будут просто отнимать ресурсы. А когда, она это что-то происходит, она возобновляет свое выполнение (но не обязательно в том же потоке!). Идея его в том, что функция не держит поток, если для её продолжения нужно что-то подождать. В нашем случае, мы будем ждать вилки.

Вот реализация с использованием этого паттерна. SemaphoreSlim имеет для этого WaitAsync() метод.

// Запуск такой же, как раньше. Где-нибудь в программе:
Task.Run(() => Run(i, cancelTokenSource.Token)); // Запуск философа.
// Ключевое слово async -- компилятор транслирует этот метот в асинхронный.
public async Task Run(int i, CancellationToken token)
{ while (true) { // await -- будем ожидать какого-то события. await TakeForks(i); // После await, продолжение возможно в другом потоке. eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1); // Может быть несколько событий для ожидания. await PutForks(i); Think(i); if (token.IsCancellationRequested) break; }
} async Task TakeForks(int i)
{ bool hasForks = false; while (!hasForks) { // Взаимоисключающий доступ к столу: await _tableSemaphore.WaitAsync(); if (forks[Left(i)] == 0 && forks[Right(i)] == 0) { forks[Left(i)] = i+1; forks[Right(i)] = i+1; hasForks = true; } _tableSemaphore.Release(); // Будем ожидать, чтобы сосед положил вилки: if (!hasForks) await _philosopherSemaphores[i].WaitAsync(); }
} // Ждем доступа к столу и кладем вилки.
async Task PutForks(int i)
{ await _tableSemaphore.WaitAsync(); forks[Left(i)] = 0; // "Пробудить" соседей, если они "спали". _philosopherSemaphores[LeftPhilosopher(i)].Release(); forks[Right(i)] = 0; _philosopherSemaphores[RightPhilosopher(i)].Release(); _tableSemaphore.Release();
}

Через него можно ждать завершения метода, отменять его и всё прочее, что можно делать с Task. Метод с async / await транслируется в хитрый конечный автомат, который сразу возвращает свой внутренний Task. Суть в том, что если нет задержки, то выполнение синхронное, а если есть, то поток освобождается. Внутри метода, конечный автомат, контролирует выполнение. Можно создавать цепочки из этих async / await методов. Для лучшего понимания этого лучше посмотреть этот конечный автомат.

Работа 100 философов на машине с 4 логическими ядрами, 8 секунд. Потестируем. Каждый из этих 4 потоков простаивал около 2мс. Предыдущее решение с Monitor выполняло только 4 первых потока, а остальные вообще не выполнялись. 8 секунд. А решение с async / await выполняло все 100, при этом в среднем каждый ждал 6. Решение же с Monitor оказалось не масштабируемым вообще. Конечно, в реальных системах простаивание по 6 секунд неприемлемо и лучше не обрабатывать столько запросов так.

Заключение

NET поддерживает много конструкций синхронизации. Как видно из этих небольших примеров, . Надеюсь эта статья оказалась полезной. Не всегда, впрочем, очевидно, как их использовать. Пока на этом завершаем, но осталось еще много интересного, например потокобезопасные коллекции, TPL Dataflow, Reactive программирование, Software Transaction модель и др.

Источники

Показать больше

Похожие публикации

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»