Главная » Хабрахабр » [Перевод] Распараллеливание задач с зависимостями —  пример на .NET

[Перевод] Распараллеливание задач с зависимостями —  пример на .NET

Здравствуйте, коллеги!

NET" издательства Manning: На этой неделе мы отдали в перевод амбициозную по своей сложности книгу "Concurrency in .

Такие проблемы можно решать при помощи последовательного и императивного выполнения, но, если вы хотите добиться максимальной производительности, то последовательные операции вам не подойдут. Автор любезно выложил на сайте Medium отрывок из 13-й главы, который мы и предлагаем оценить задолго до премьеры.
Приятного чтения!
Допустим, вам нужно написать инструмент, позволяющий выполнять ряд асинхронных задач, у каждой из которых – свой набор зависимостей, влияющих на порядок выполнения операций. Многие конкурентные проблемы можно трактовать как статические коллекции атомарных операций с зависимостями между их входными и выходными данными. Вместо этого нужно организовать параллельное выполнение задач. Для оптимизации производительности задачи нужно назначать, исходя из их зависимостей, и алгоритм должен быть настроен так, чтобы зависимые задачи выполнялись настолько последовательно, насколько это необходимо, и настолько параллельно, насколько это возможно. По завершении операции ее вывод используется в качестве ввода для других, зависимых операций.

Как построить такую модель программирования, которая бы обеспечивала базовый параллелизм коллекции операций, выполняемых эффективно, либо параллельно, либо последовательно, смотря какие зависимости возникают между данной операцией и другими? Вы хотите сделать многоразовый компонент, параллельно выполняющий серию задач, и гарантировать, что будут учтены все зависимости, которые могли бы повлиять на порядок выполняемых операций.

Решение: Реализуем граф зависимостей при помощи класса MailboxProcessor из F# и предоставляем методы как стандартные задачи (Task), чтобы их можно было потреблять и из C#

В данном случае важна ациклическая суть графа, поскольку она устраняет возможность взаимных блокировок между задачами, при условии, что задачи на самом деле полностью атомарны. Такое решение называется «Ориентированный ациклический граф» (DAG) и призвано сформировать граф, дробя операции на последовательности атомарных задач с четко определенными зависимостями. Ниже приведен типичный пример графовидной структуры данных, с помощью которой можно представить ограничения, возникающие при планировании взаимодействий между операциями в данном графе. Задавая граф, важно понимать все зависимости между задачами, особенно неявные зависимости, которые могут приводить к взаимным блокировкам или условиям гонки.

Граф – исключительно мощная структура данных, и на ее основе можно писать сильные алгоритмы.

1 Граф – это совокупность вершин, соединенных ребрами. Рис. В этом представлении ориентированного графа узел 1 зависит от узлов 4 и 5, узел 2 зависит от узла 5, узел 3 зависит от узлов 5 и 6 и т.д.

Структуру такого графа можно определить при помощи класса MailboxProcessor из языка F#; в данном классе сохраняется внутреннее состояние для задач, зарегистрированных для выполнения в форме зависимостей ребер. Структура DAG применима в качестве стратегии для параллельного выполнения задач с учетом порядка зависимостей, что позволяет повысить производительность.

Валидация ориентированного ациклического графа

Например, возвращаясь к рисунку 1: что будет, если у нас зарегистрирован узел 2 с зависимостями от узла 5, а узла 5 не существует? При работе с любой графовой структурой данных, например DAG, приходится заботиться о правильной регистрации ребер. При наличии ориентированного цикла критически важно выполнять некоторые задачи параллельно; в противном случае некоторые задачи могут вечно дожидаться выполнения других, и возникнет взаимная блокировка. Также может случиться, что некоторые ребра зависят друг от друга, из-за чего возникает ориентированный цикл.

Так, если задача A должна завершиться до задачи B, а задача B – до задачи C, которая, в свою очередь, должна завершиться до задачи A, то возникает циклическая ссылка, и система уведомит вас об этой ошибке, выбросив исключение. Задача решается при помощи топологической сортировки: это означает, что мы можем упорядочить все вершины графа таким образом, чтобы любое ребро вело от вершины с меньшим номером к вершине с большим номером. Проверка такого рода называется «обнаружение цикла в ориентированном графе». Если при управлении очередностью возникает ориентированный цикл, то решения нет. Если ориентированный граф удовлетворяет описанным правилам, то является ориентированным ациклическим графом, отлично подходящим для параллельного запуска нескольких задач, между которыми существуют зависимости.

Полная версия листинга 2, содержащая код валидации DAG, есть в исходном коде, выложенном онлайн.

Сначала давайте определим меченое объединение, при помощи которого будем управлять задачами и выполнять их зависимости. В следующем листинге класс MailboxProccessor из F# используется как идеальный кандидат для реализации DAG, обеспечивающего параллельное выполнение операций, связанных зависимостями.

Листинг 1 Тип сообщения и структура данных для координации выполнения задач в соответствии с их зависимостями

type TaskMessage = // #A
| AddTask of int * TaskInfo
| QueueTask of TaskInfo
| ExecuteTasks
and TaskInfo = // #B

#A посылает к ParallelTasksDAG базовый агент dagAgent, отвечающий за координацию выполнения задач

#B Обертывает детали каждой задачи для выполнения

Эти сообщения используются для координации задач и синхронизации зависимостей. Тип TaskMessage представляет оболочки сообщений, отправляемых базовому агенту типа ParallelTasksDAG. Контекст выполнения (https://msdn.microsoft.com/en-us/library/system.threading.executioncontext(v=vs. Тип TaskInfo содержит и отслеживает подробности зарегистрированных задач в процессе выполнения DAG, в том числе, ребра зависимостей. После срабатывания события публикуется время начала и завершения выполнения. 110).aspx) захватывается с целью обращения к информации при отложенном выполнении, например, такой информации: текущий пользователь, любое состояние, ассоциированное с логическим потоком выполнения, информация о безопасном доступе к коду и т.п.

Листинг 2 Агент DAG на F# для распараллеливания выполнения операций, связанных зависимостями

type ParallelTasksDAG() = let onTaskCompleted = new Event<TaskInfo>() // #A let dagAgent = new MailboxProcessor<TaskMessage>(fun inbox -> let rec loop (tasks : Dictionary<int, TaskInfo>) // #B (edges : Dictionary<int, int list>) = async { // #B let! msg = inbox.Receive() // #C match msg with | ExecuteTasks -> // #D let fromTo = new Dictionary<int, int list>() let ops = new Dictionary<int, TaskInfo>() // #E for KeyValue(key, value) in tasks do // #F let operation = { value with EdgesLeft = Some(value.Edges.Length) } for from in operation.Edges do let exists, lstDependencies = fromTo.TryGetValue(from) if not <| exists then fromTo.Add(from, [ operation.Id ]) else fromTo.[from] <- (operation.Id :: lstDependencies) ops.Add(key, operation) ops |> Seq.iter (fun kv -> // #F match kv.Value.EdgesLeft with | Some(n) when n = 0 -> inbox.Post(QueueTask(kv.Value)) | _ -> ()) return! loop ops fromTo | QueueTask(op) -> // #G Async.Start <| async { // #G let start = DateTimeOffset.Now match op.Context with // #H | null -> op.Task.Invoke() |> Async.AwaitATsk | ctx -> ExecutionContext.Run(ctx.CreateCopy(), // #I (fun op -> let opCtx = (op :?> TaskInfo) opCtx.Task.Invoke().ConfigureAwait(false)), taskInfo) let end’ = DateTimeOffset.Now onTaskCompleted.Trigger { op with Start = Some(start) End = Some(end’) } // #L let exists, deps = edges.TryGetValue(op.Id) if exists && deps.Length > 0 then let depOps = getDependentOperation deps tasks [] edges.Remove(op.Id) |> ignore depOps |> Seq.iter (fun nestedOp -> inbox.Post(QueueTask(nestedOp))) } return! loop tasks edges | AddTask(id, op) -> tasks.Add(id, op) // #M return! loop tasks edges } loop (new Dictionary<int, TaskInfo>(HashIdentity.Structural)) (new Dictionary<int, int list>(HashIdentity.Structural))) [<CLIEventAttribute>] member this.OnTaskCompleted = onTaskCompleted.Publish // #L
member this.ExecuteTasks() = dagAgent.Post ExecuteTasks // #N
member this.AddTask(id, task, [<ParamArray>] edges : int array) = let data = { Context = ExecutionContext.Capture() Edges = edges; Id = id; Task = task NumRemainingEdges = None; Start = None; End = None } dagAgent.Post(AddTask(id, data)) // #O

#A Экземпляр класса onTaskCompletedEvent, используется для уведомления о завершении задачи

Коллекции изменяемы, поскольку в ходе выполнения ParallelTasksDAG состояние меняется, и поскольку они унаследовали потокобезопасность, так как находятся в Agent #B Внутреннее состояние агента для отслеживания регистров задач и их зависимостей.

#C Асинхронно ожидаем выполнения

#D Оболочка сообщения, запускающего ParallelTasksDAG

#E Коллекция, отображаемая на монотонно увеличиваемый индекс с задачей к запуску

#F Процесс перебирает список задач, анализируя зависимости с другими задачами для создания топологической структуры, в которой представлен порядок выполнения задач

#G Оболочка сообщения для постановки задачи в очередь, ее выполнения и, в конечном итоге, для удаления этой задачи из состояния агента в качестве активной зависимости после того как задача завершится

#H Если подхваченный ExecutionContext равен null, то запускаем задачу в текущем контексте, в противном случае переходим к #I

#I Запускаем задачу в перехваченном ExecutionContext

В событии содержится информация о задаче #L Инициируем и публикуем событие onTaskCompleted, чтобы дать уведомление о завершении задачи.

#M Оболочка сообщения для добавления задачи на выполнение в соответствии с ее зависимостями, если таковые имеются

#N Запускает выполнение зарегистрированных задач

#O Добавление задачи, ее зависимостей и текущего ExecutionContext для выполнения DAG.

Эта функция принимает уникальный ID, задачу, которая должна быть выполнена, и множество ребер, представляющих ID всех других зарегистрированных задач, которые должны быть выполнены прежде, чем можно будет приступать к выполнению данной задачи. Цель функции AddTask – зарегистрировать задачу с произвольными ребрами зависимостей. Экземпляр MailboxProcessor под названием dagAgent хранит зарегистрированные задачи в актуальном состоянии “tasks,” представляющим собой словарь (tasks : Dictionary<int, TaskInfo>), соотносящий ID каждой задачи и ее подробности. Если массив пуст, это означает, что зависимостей нет. Когда агент получает уведомление о необходимости приступить к выполнению, в рамках этого процесса проверяется, чтобы все зависимости ребер были зарегистрированы, и чтобы в графе не было циклов. Более того, в Agent также хранится состояние зависимостей ребер по ID каждой задачи (edges : Dictionary<int, int list>). Далее предлагаю пример на C#, где ссылаюсь на библиотеку that F# для запуска ParallelTasksDAG (и потребляю ее). Данный этап верификации доступен а полной реализации ParallelTasksDAG, приведенной в онлайновом коде. 1. Зарегистированные задачи отражают зависимости, представленные выше на рис.

Func<int, int, Func<Task>> action = (id, delay) => async () => { Console.WriteLine($”Starting operation{id} in Thread Id {Thread.CurrentThread.ManagedThreadId}…”); await Task.Delay(delay);
};
var dagAsync = new DAG.ParallelTasksDAG();
dagAsync.OnTaskCompleted.Subscribe(op => Console.WriteLine($”Operation {op.Id} completed in Thread Id { Thread.CurrentThread.ManagedThreadId}”));
dagAsync.AddTask(1, action(1, 600), 4, 5);
dagAsync.AddTask(2, action(2, 200), 5);
dagAsync.AddTask(3, action(3, 800), 6, 5);
dagAsync.AddTask(4, action(4, 500), 6);
dagAsync.AddTask(5, action(5, 450), 7, 8);
dagAsync.AddTask(6, action(6, 100), 7);
dagAsync.AddTask(7, action(7, 900));
dagAsync.AddTask(8, action(8, 700));
dagAsync.ExecuteTasks();

Цель вспомогательной функции – вывести сообщение о том, что началось выполнение задачи, сославшись при этом на Id актуального потока для подтверждения многопоточности. С другой стороны, событие OnTaskCompleted регистрируется для выдачи уведомления о завершении каждой задачи с выводом в консоль ID задачи и Id актуального потока. Вот вывод, который мы получим при вызове метода ExecuteTasks.

Starting operation 8 in Thread Id 23…
Starting operation 7 in Thread Id 24…
Operation 8 Completed in Thread Id 23
Operation 7 Completed in Thread Id 24
Starting operation 5 in Thread Id 23…
Starting operation 6 in Thread Id 25…
Operation 6 Completed in Thread Id 25
Starting operation 4 in Thread Id 24…
Operation 5 Completed in Thread Id 23
Starting operation 2 in Thread Id 27…
Starting operation 3 in Thread Id 30…
Operation 4 Completed in Thread Id 24
Starting operation 1 in Thread Id 28…
Operation 2 Completed in Thread Id 27
Operation 1 Completed in Thread Id 28
Operation 3 Completed in Thread Id 30

Как видите, задачи выполняются параллельно в разных потоках (ID потока у них отличаются), и порядок зависимостей у них сохраняется.

Подробнее читайте в книге Concurrency in . В сущности, вот так и распараллеливаются задачи, имеющие зависимости. NET.


Оставить комментарий

Ваш email нигде не будет показан
Обязательные для заполнения поля помечены *

*

x

Ещё Hi-Tech Интересное!

[Из песочницы] Валидация сложных форм React. Часть 1

Для начала надо установить компонент react-validation-boo, предполагаю что с react вы знакомы и как настроить знаете. npm install react-validation-boo Чтобы много не болтать, сразу приведу небольшой пример кода. import React, from 'react'; import {connect, Form, Input, logger} from 'react-validation-boo'; class ...

[Перевод] Микросервисы на Go с помощью Go kit: Введение

Эта статья — введение в Go kit. В этой статье я опишу использование Go kit, набора инструментов и библиотек для создания микросервисов на Go. Первая часть в моем блоге, исходный код примеров доступен здесь. Когда вы разрабатываете облачно-ориентированную распределенную систему, ...