Хабрахабр

[Из песочницы] Поддержка очередей в Hangfire

Примером такого кода может быть отправка E-Mail, обработка видео, синхронизация с другой системой и т.д. Hangfire — это библиотека для .net (core), позволяющая асинхронно выполнять некоторый код по принципу "fire and forget". Помимо "fire and forget" есть поддержка отложенных задач, а также задач по расписанию в формате Cron.

Несколько преимуществ, говорящих в пользу Hangfire: В настоящее время существует масса подобных библиотек.

  • Простая конфигурация, удобный API
  • Надежность. Hangfire гарантирует, что созданная задача будет выполнена хотя бы один раз
  • Возможность параллельного выполнения задач и отличная производительность
  • Расширяемость (вот ей-то мы и воспользуемся ниже)
  • Достаточно полная и понятная документация
  • Dashboard, на котором можно видеть всю статистику о задачах

В этой статье я разберу, как воспользоваться поддержкой нескольких очередей (или пулов задач), как починить стандартную retry-функциональность и сделать так, чтобы каждая очередь имела индивидуальную конфигурацию. Не буду слишком вдаваться в детали, поскольку существует немало хороших статей о Hangfire и способах его применения.

Существующая поддержка (псевдо)-очередей

Т.е. Важное замечание: в заголовке я использовал термин псевдо-очередь, потому что Hangfire не гарантирует выполнение задач в определенном порядке. Более того автор библиотеки рекомендует делать задачи идемпотентными, т.е. принцип "First In First Out" не действует и мы не будем на него опираться. Далее я буду использовать просто слово "очередь", т.к. устоичивыми к непредвиденному многократному выполнению. в Hangfire используется термин "Queue".

Хотя он и не предлагает гибкости Message Queue Systems, таких как rabbitMQ или Azure Service Bus, но этого часто вполне достаточно для решения широкого спектра задач. В Hangfire заложена простая поддержка очередей.

По умолчанию, задача отправляется в очередь с именем "default", если не указано иное. Каждая задача имеет свойство "Queue", то есть имя очереди, в которой она должна выполняться. Например, мы можем захотеть, чтобы задачи по обработке видео попадали в очередь "video_queue", а рассылка E-Mail'ов в очередь "email_queue". Поддержка нескольких очередей нужна для того, чтобы раздельно управлять выполнением задач разных типов. Если мы захотим вынести обработку видео на выделенный сервер, то мы легко сможем это сделать, запустив отдельный Hangfire-сервер как консольное приложение, которое будет обрабатывать очередь "video_queue". Таким образом мы получаем возможность независимо выполнять эти два типа задач.

Перейдем к практике

Настройка Hangfire-сервера в asp.net core выглядит следующим образом:

public void Configure(IApplicationBuilder app)
});
}

Проблема 1

Если задача, положенная в очередь, например, "video_queue", завершилась с ошибкой и нуждается в повторе, то на повторное выполнение она будет отправлена в очередь "default", а не "video_queue" и, как следствие, наша задача будет выполняться совсем не тем экземпляром Hangfire-сервера, которым нам бы хотелось, если вообще будет. Как я уже упоминал выше, в Hangfire существует очередь по умолчанию, которая называется "default". Такое поведение было мной установлено опытным путем и возможно является багом в самом Hangfire.

Job Filters

NET MVC. Hangfire предоставляет нам возможность расширения функционала с помощью так называемых фильтров (Job Filters), которые по принципу работы похожи на Actions Filters в ASP. Это движок, который поочередно переводит имеющиеся в пуле задачи из одного состояния в другое (например, created -> enqueued -> processing -> succeeded), а фильтры позволяют нам "перехватывать" выполняемую задачу при каждом изменении её состояния и производить манипуляции с ней. Дело в том, что внутренняя логика Hangfire реализована как State Machine. Фильтр реализуется как аттрибут, который может быть применен к отдельному методу, классу или глобально.

Job Parameters

Этот объект содержит полную информацию о выполняемой в данный момент задаче. В качестве аргумента в метод фильтра передается объект ElectStateContext. Job Parameters позволяют сохранять связанную с задачей информацию в базе данных. Среди прочего он имеет методы GetJobParameter<>(...) и SettJobParameter<>(...). Именно в Job Parameters и хранится имя очереди, в которую была изначально отправлена задача, только почему-то эта информация игнорируется при последующем повторе.

Решение

Повторение завершившейся с ошибкой задачи — это переход из состояния "failed" в состояние "enqueued". Итак, у нас есть задача, которая завершилась с ошибкой и должна быть отправлена на повторное выполнение в нужную очередь (в ту самую, которая была ей присвоена в момент первоначального создания). Для решения проблемы создадим фильтр, который при переходе задачи в состояние "enqueued", будет проверять в какую очередь задача была отправлена изначально и проставлять параметр "QueueName" в нужное значение:

public class HangfireUseCorrectQueueFilter : JobFilterAttribute, IElectStateFilter
{ public void OnStateElection(ElectStateContext context) { if (context.CandidateState is EnqueuedState enqueuedState) { var queueName = context.GetJobParameter<string>("QueueName"); if (string.IsNullOrWhiteSpace(queueName)) { context.SetJobParameter("QueueName", enqueuedState.Queue); } else { enqueuedState.Queue = queueName; } } }
}

Для того, чтобы применть фильтр по умолчанию ко всем задачам (то есть глобально), добавим следующий код в нашу конфигурацию:

GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });

Это стандартный фильтр, который отвечает за повторное выполнение неудачно завершенных задач. Еще одна небольшая загвоздка заключается в том, что коллекция GlobalJobFilters по умолчанию содержит экземпляр класса AutomaticRetryAttribute. Для того, чтобы наш велосипед поехал нужно удалить этот фильтр из коллекции и позволить нашему фильтру взять на себя ответственность за повторное выполнение задач. Он же и отправляет задачу в очередь "default", игнорируя изначальную очередь. В результате конфигурационный код будет выглядеть так:

var defaultRetryFilter = GlobalJobFilters.Filters .FirstOrDefault(f => f.Instance is AutomaticRetryAttribute); if (defaultRetryFilter != null && defaultRetryFilter.Instance != null)
{ GlobalJobFilters.Filters.Remove(defaultRetryFilter.Instance);
} GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });

реализацию метода ScheduleAgainLater) Необходимо отметить, что AutomaticRetryAttribute реализует логику автоматического увеличения интервала между попытками (с каждой последующей попыткой интервал увеличивается), и удаляя AutomaticRetryAttribute из коллекции GlobalJobFilters, мы отказываемся от этой функциональности (см.

Только теперь мы не знаем сколько раз и с каким интервалом наши задачи будут повторяться в случае ошибки, поскольку удалили AutomaticRetryAttribute из коллекции фильтров. Итак, мы добились того, что наши задачи могут выполняться в разных очередях и это позволяет нам независимо управлять их выполнением, в том числе обрабатывать разные очереди на разных машинах.

Проблема 2

Для этого мы реализуем еще один фильтр и назовем его HangfireRetryJobFilter. Мы хотим иметь возможность конфигурировать интервал и количество повторений отдельно для каждой очереди, а также, если для какой-то очереди мы не указали значения явно, то хотим, чтобы применялись значения по умолчанию.

В идеале, конфигурационный код должен выглядть примерно так:

GlobalJobFilters.Filters.Add(new HangfireRetryJobFilter
{ Order = 2, ["email_queue"] = new HangfireQueueSettings { DelayInSeconds = 120, RetryAttempts = 3 }, ["video_queue"] = new HangfireQueueSettings { DelayInSeconds = 60, RetryAttempts = 5 }
});

Решение

Для этого сначала добавим класс HangfireQueueSettings, который будет служить контейнером для наших настроек.

public sealed class HangfireQueueSettings
{ public int RetryAttempts { get; set; } public int DelayInSeconds { get; set; }
}

Затем добавим реализацию самого фильтра, который при повторном выполнении задач после ошибки будет применять настройки в зависимости от конфигурации очереди и следить за количеством повторов:

public class HangfireRetryJobFilter : JobFilterAttribute, IElectStateFilter, IApplyStateFilter
{ private readonly HangfireQueueSettings _defaultQueueSettings = new HangfireQueueSettings { RetryAttempts = 3, DelayInSeconds = 10 }; private readonly IDictionary<string, HangfireQueueSettings> _settings = new Dictionary<string, HangfireQueueSettings>(); public HangfireQueueSettings this[string queueName] { get { return _settings.TryGetValue(queueName, out HangfireQueueSettings queueSettings) ? queueSettings : _defaultQueueSettings; } set { _settings[queueName] = value; } } public void OnStateElection(ElectStateContext context) { if (!(context.CandidateState is FailedState failedState)) { // This filter accepts only failed job state. return; } var retryAttempt = context.GetJobParameter<int>("RetryCount") + 1; var queueName = context.GetJobParameter<string>("QueueName"); if (retryAttempt <= this[queueName].RetryAttempts) { ScheduleAgainLater(context, retryAttempt, failedState, queueName); } else { TransitionToDeleted(context, failedState, queueName); } } public void OnStateApplied( ApplyStateContext context, IWriteOnlyTransaction transaction) { if (context.NewState is ScheduledState && context.NewState.Reason != null && context.NewState.Reason.StartsWith("Retry attempt")) { transaction.AddToSet("retries", context.BackgroundJob.Id); } } public void OnStateUnapplied( ApplyStateContext context, IWriteOnlyTransaction transaction) { if (context.OldStateName == ScheduledState.StateName) { transaction.RemoveFromSet("retries", context.BackgroundJob.Id); } } private void ScheduleAgainLater( ElectStateContext context, int retryAttempt, FailedState failedState, string queueName) { context.SetJobParameter("RetryCount", retryAttempt); var delay = TimeSpan.FromSeconds(this[queueName].DelayInSeconds); const int maxMessageLength = 50; var exceptionMessage = failedState.Exception.Message.Length > maxMessageLength ? failedState.Exception.Message.Substring(0, maxMessageLength - 1) + "…" : failedState.Exception.Message; // If attempt number is less than max attempts, we should // schedule the job to run again later. var reason = $"Retry attempt {retryAttempt} of {this[queueName].RetryAttempts}: {exceptionMessage}"; context.CandidateState = delay == TimeSpan.Zero ? (IState)new EnqueuedState { Reason = reason } : new ScheduledState(delay) { Reason = reason }; } private void TransitionToDeleted( ElectStateContext context, FailedState failedState, string queueName) { context.CandidateState = new DeletedState { Reason = this[queueName].RetryAttempts > 0 ? "Exceeded the maximum number of retry attempts." : "Retries were disabled for this job." }; }
}

Примечание к коду: при реализации класса HangfireRetryJobFilter был взят за основу класс AutomaticRetryAttribute из Hangfire, поэтому реализация некоторых методов частично совпадает с соответствующими методами этого класса.

Заключение

Мы реализовали свой механизм повторения неудачно завершившихся задач с возможностью индивидуальной конфигурации для каждой очереди, расширив функциональность Hangfire с помощью Job Filters. В этой статье мы воспользовались поддержкой нескольких очередей в Hangfire для обработки задач разных типов.

Буду рад комментариям. Надеюсь, эта статья окажется кому-нибудь полезной.

Полезные ссылки

NET Документация Hangfire
Исходный код Hangfire
Scott Hanselman — How to run Background Tasks in ASP.

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть