Хабрахабр

[Из песочницы] Обрабатываем заказы из интернет магазина с помощью RabbitMQ и TypeScript

Популярность интернет коммерции постоянно растет, как и доля информатизации всех смежных с торговлей видов деятельности. Всем привет! Каждый заказ, сделанный клиентом интернет магазина, порождает за собой большое количество интеграций с различными сервисами. Вместе с этим растет и сложность обработки информации. Каждый заказ должен быть оплачен, учтен, собран и доставлен, а также доступен для последующего анализа. Такими сервисами могут быть сервисы обработки платежей, доставки, системы учета и лояльности. Отклик от интернет магазина должен быть быстрым, ведь каждая миллисекунда задержки увеличивает шанс потери клиента, а в последствии и прибыли. Эту, и так не простую ситуацию, усложняет и тот факт, что пользователь интернет магазина не хочет долго и мучительно чего-то ждать при оформлении заказа. Добро пожаловать под кат. В этой статье я хочу рассказать про брокер сообщений RabbitMQ и как с его помощью можно организовать процесс обработки заказов используя Node.js и TypeScript.

Необходимая теория

Брокер сообщений необходим для связи разных компонентов системы в единое целое, как клей необходим для реанимации разбитой вазы. Думаю, многие слышали про RabbitMQ, ведь первая open source версия этого брокера сообщений, основанного на протоколе AMQP, была выпущена аж в 2007 году. Как раз такая асинхронная обработка заказов и нужна интернет магазину. С помощью брокера сообщений можно реализовать асинхронную обработку событий, поступающих в систему. У этого брокера есть три основных компонента, с помощью которых мы будем выстраивать процесс обработки: Но для начала необходимо разобраться с основными компонентами RabbitMQ.

  • Message. Это минимальная единица информации в рамках брокера сообщений и нашего сервиса обработки, которая может быть обработана. Сам RabbitMQ хранит сообщения в бинарном виде, но для нашей системы и для статьи это не важно. Сообщения мы будем принимать и обрабатывать в виде JSON. Так же стоит упомянуть, что сообщения в RabbitMQ имеют заголовки. Они схожи с заголовками http запросов. Это ассоциативный массив, в который можно записать необходимую информацию.
  • Message queue. Это очередь, в которой RabbitMQ хранит сообщения. На очередь сообщений могут быть подписаны один или несколько потребителей (consumer). Каждое сообщение из очереди rabbit распределяет по потребителям, используя алгоритм round-robin.
  • Exchange. Это, как понятно из названия, точка обмена. К этой точке могут быть привязаны очереди или другие обменники. Точка обмена не хранит сообщения, основная ее функция — это маршрутизация сообщений в одну или несколько очередей, или такие же точки обмена. Каждая очередь или обменник привязывается по ключу маршрутизации (routing key). В RabbitMQ есть несколько разных типов обменников, которые влияют на то, как именно exchange будет маршрутизировать поступившее в него сообщение.

Ключ маршрутизации есть как у привязки (binding) очереди к обменнику, так и у самого сообщения. Для того, чтобы описать, как работают разные типы обменников, необходимо разобраться, что из себя представляют ключи маршрутизации. Каждый блок разделен точкой. Routing key -это просто строка, поделенная на блоки. При этом для ключа маршрутизации сообщения можно задавать шаблоны с использованием специальных символов # и *. Например, “notify.sendEmail.sendSms”. Например “notify.sendSms.*” или “notify.#”. * — говорит что после точки может идти один любой блок, а вот после # может идти любое количество блоков. Теперь можно переходить к типам точек обмена.

Есть четыре типа обменников:

  • Fanout. Логика маршрутизации данного exchange'a проста, он перенаправляет поступившее сообщение во все очереди или обменники, которые привязаны к нему.

  • Direct. Этот exchange перенаправляет сообщение в зависимости от того, совпадает ли routing key сообщения с routing key привязки.

  • Topic. Exchange этого типа также как и Direct маршрутизирует сообщение в зависимости от routing key. Но в качестве ключа маршрутизации может выступать шаблон.

  • Headers. Этот exchange, в отличие от остальных, использует для маршрутизации заголовки сообщений. При этом очереди к обменнику биндятся также с помощью ассоциативного массива. Логику, по которой обменник будет маршрутизировать сообщения, можно менять с помощью специального ключа “x-match“, который задается в ассоциативном массиве привязки. Ключу можно задать два значения all или any. Если значение all, то заголовки сообщения должны полностью совпадать с ассоциативным массивом привязки, если значение any, то значение должно совпадать хотя бы у одного ключа.

Более подробно об этих компонентах можно почитать в спецификации протокола AMQP. Это основные компоненты RabbitMQ. Далее мы будем проектировать и реализовывать систему обработки заказов на примере TypeScript, попутно разбираясь с настройками каждого компонента.

Проектирование

Для упрощения примера будем считать, что для успешной обработки интернет заказа у нас должен быть следующий функционал:

  • Сохранять поступающие заказы
  • Отправлять sms клиенту с номером заказа, а также статусом заказа
  • Отправлять сообщение в службу курьерской доставки о новом заказе из нашего интернет магазина, если клиент выбрал этот способ доставки

Например, уведомлять клиента по email или предоставлять на выбор несколько способов доставки заказа. Но мало реализовать этот функционал, ведь наш интернет магазин планирует расширять функционал и предоставлять в дальнейшем больше разных возможностей своим клиентам (а такое происходит всегда). Из этого следует, что нам нужно спроектировать систему таким образом, чтобы добавлять функционал было просто.

Про этот шаблон можно почитать тут Также стоит упомянуть, что я буду использовать шаблон отложенных сообщений для того, чтобы была возможность, при недоступности внешнего сервиса, повторять логику несколько раз.

Чтобы более наглядно представить конечную цель, я нарисую схему.

Схема разбита на блоки и разные цвета. Давайте разберемся по порядку, как устроен процесс обработки заказа на этой схеме. Блоки серого цвета обозначают элементы RabbitMQ. Блоки белого цвета обозначают внешние сервисы, которые мы рассматривать не будем. Зеленым цветом отражены блоки бизнес логики, которые необходимо реализовывать. Очереди и обменники. Цифры обозначают процесс и подпроцесс в соответствии с порядком. Также каждый блок, имеющий отношение к нашей логике, пронумерован.

После этого мы должны присвоить номер заказу, сохранить заказ в базе данных со статусом “новый” и отправить ответ об успешном создании заказа, с его номером, обратно. Первым делом, сообщение по HTTP API попадает в наш сервис. Отправив положительный ответ, мы отправляем объект заказа в exchange постобработки, из которого он попадает в worker формирования routing key. Клиент, получив сообщение об успешном создании заказа, идет заниматься своими делами. Сформировав routing key, worker отправляет сообщение обратно в exchange постобработки, но теперь ключ маршрутизации у заказа изменился и обменник может отправить его уже по нужному маршруту. Этот воркер, получив объект заказа из очереди, на его основе (есть ли в заказе email или телефон клиента, какой способ доставки был выбран) должен сформировать ключ маршрутизации заказа. А далее по такой же логике в очереди и воркеры. В зависимости от ключа, заказ может быть отправлен в exchange, который отвечает за уведомления, exchange интеграций или сразу в оба.

Количество таких попыток можно передать в переменной окружения. Воркеры отправки sms и службы доставки будут пытаться обработать сообщение несколько раз. Поэтому после превышения количества допустимых попыток сообщение будет удаляться из очередей и отправляться в хранилище ошибок, из которого его можно будет повторно отправить обратно на нужный уровень обработки. Но бесконечно обрабатывать сообщение не стоит, ведь ошибка может крыться в самом сообщении или логике воркера.

Реализация

Я рекомендую использовать для этой цели docker и официальный образ брокера. Для проверки реализации потребуется сам rabbit. Установить и запустить контейнер можно следующей командой.

docker run -d --name rabbit -p 5672:5672 -e rabbitmq:3.7.15-management-alpine

Это образ с web интерфейсом, доступным на порту 15672, для удобной отладки.

Опишем интерфейсы заказа и сообщений, которые мы будем отправлять в rabbit. Реализовывать задуманное будем с помощью TypeScript и библиотеки amqplib (реализация клиента RabbitMQ для Node.js) поэтому для начало необходимо описать несколько интерфейсов.

// Интерфейс товара заказа
export interface Product { id: string; name: string; price: number;
} // Общий интерфейс заказа
export interface Order { clientName: string; city: string; email?: string; phone?: string; products: Product[]; totalSum: number; deliveryAddress?: string;
} // Интерфейс заказа у которого есть номер телефона клиента
export interface OrderWithPhone extends Order { phone: string;
}
// Интерфейс заказа у которого есть адрес доставки
export interface OrderWithDeliveryAddress extends Order { deliveryAddress: string;
} // Types Guard'ы для определения какой заказ к нам пришел
export const isOrderWithPhone = (order: Order): order is OrderWithPhone => Boolean(order.phone); export const isOrderWithDeliveryAddress = (order: Order): order is OrderWithDeliveryAddress => Boolean(order.deliveryAddress); // Сообщение в рамках системы.
export interface Message<O extends Order> { errors: string[]; retry: number; order: O; // Интерфейс сообщения которое будет отправленно в хранилище ошибок
export interface FailOrder extends Message<Order> { exchange: string; routingKey: string;
}

Теперь нужно описать интерфейс конфигурации очередей и обменников, на основе которой будем строить структуру обработки в rabbit.

import from '../constants';
import { Options } from 'amqplib'; // Типы объектов RabbitMQ которые мы будем использовать для конфигурации
export enum Types { QUEUE = 'queue', EXCHANGE = 'exchange',
} // Типы обменников которые будем использовать
export enum ExchangeTypes { TOPIC = 'topic',
} // Интерфейс описания очереди
export interface Queue { name: string; options: Options.AssertQueue;
} // Интерфейс описания обменника
export interface Exchange { name: string; type: ExchangeTypes;
} // Интерфейс описания привязки
export interface Binding { type: Types; destination: string; source: string; routingKey: string;
} // Интерфейс конфигурации RabbitMQ
export interface PipelineConfig { queues: Queue[]; exchanges: Exchange[]; bindings: Binding[];
}

Описав основные компоненты системы, опишем конфигурацию, которая была нарисована на схеме с помощью объекта.

Queues

export default [ // Очередь для сообщений для которых нужно сгенерировать routingKey { name: 'generateRoutingKey', options: { durable: true, }, }, // Очередь отправки sms { name: 'sendSms', options: { durable: true, }, }, // Очередь интеграции со службой доставки { name: 'delivery', options: { durable: true, }, }, // Отложенная очередь для сообщений которые ожидают повторной отправки sms { name: 'sendSmsHold', options: { durable: true, deadLetterExchange: 'notify', deadLetterRoutingKey: 'sendSms', messageTtl: 60000, }, }, // Отложенная очередь для сообщений которые ожидают повторной отправки в службу доставки { name: 'deliveryHold', options: { durable: true, deadLetterExchange: 'integrates', deadLetterRoutingKey: 'delivery', messageTtl: 60000, }, },
];

При описании очередей используются следующие опции для очереди

  • durable. По умолчанию все сообщения очереди хранятся в памяти. Следовательно, при перезагрузке брокера сообщения пропадут. Для избежания этого можно использовать эту опцию. С этой настройкой rabbit будет сбрасывать сообщения на диск. Но тут есть один нюанс. Чтобы сообщения сохранились после рестарта брокера, мало этой настройки, нужно, чтобы сообщения отправлялись в очередь с опцией persistent.
  • messageTtl. Время жизни сообщения. Задаётся в миллисекундах
  • deadLetterExchange. Имя обменника, куда сообщение отправится из очереди при истечении ее срока жизни
  • deadLetterRoutingKey. RoutingKey, с которым сообщение будет отправлено в обменник из предыдущей опции

Exchanges

import { ExchangeTypes } from '../constants'; export default [ { name: 'postprocessing', type: ExchangeTypes.TOPIC, }, { name: 'notify', type: ExchangeTypes.TOPIC, }, { name: 'integrates', type: ExchangeTypes.TOPIC, },
];

Bindings

import { Types } from '../constants'; export default [ { type: Types.EXCHANGE, destination: 'notify', source: 'postprocessing', routingKey: '#.notify.#', }, { type: Types.EXCHANGE, destination: 'integrates', source: 'postprocessing', routingKey: '#.integrates.#', }, { type: Types.QUEUE, destination: 'generateRoutingKey', source: 'postprocessing', routingKey: 'generateRoutingKey', }, { type: Types.QUEUE, destination: 'sendSms', source: 'notify', routingKey: '#.sendSms.#', }, { type: Types.QUEUE, destination: 'delivery', source: 'integrates', routingKey: '#.delivery.#', }, { type: Types.QUEUE, destination: 'sendSmsHold', source: 'notify', routingKey: 'sendSmsHold', }, { type: Types.QUEUE, destination: 'deliveryHold', source: 'integrates', routingKey: 'deliveryHold', },
];

Полная конфигурация

import { PipelineConfig } from '../interfaces';
import exchanges from './exchanges';
import queues from './queues';
import bindings from './bindigs'; export const pipelineConfig: PipelineConfig = { exchanges, queues, bindings,
};

Для подключения к rabbit напишем класс.

import { connect, Connection, Channel } from 'amqplib'; export class RabbitConnect { private _uri: string; private _connection: Connection; private _chanel: Channel; constructor() { // Строка подключения к rabbit будет браться из окружения this._uri = process.env.RABBIT_URI || 'amqp://localhost'; } protected async connect() { this._connection = await connect(this._uri); this._chanel = await this._connection.createChannel(); } protected async disconnect() { await this._chanel.close(); return this._connection.close(); } protected get chanel() { return this._chanel; }
}

Напишем класс Pipeline, который при старте будет создавать всю необходимую инфраструктуру в rabbit по описанной ранее конфигурации.

import { RabbitConnect } from './RabbitConnect';
import { PipelineConfig } from './interfaces';
import { Types } from './constants'; export class Pipeline extends RabbitConnect { private _pipeline: PipelineConfig; constructor(pipelineConfig: PipelineConfig) { super(); this._pipeline = pipelineConfig; } public async create() { try { await this.connect(); // Создаем очереди const createQueues = this._pipeline.queues.map(queue => this.chanel.assertQueue(queue.name, queue.options), ) as PromiseLike<any>[]; // Создаём обменники const createExchanges = this._pipeline.exchanges.map(exchange => this.chanel.assertExchange(exchange.name, exchange.type), ) as PromiseLike<any>[]; await Promise.all([...createQueues, ...createExchanges]); // После создания необходимых компонентов создаём биндинги const createBindings = this._pipeline.bindings.map(binding => { if (binding.type === Types.QUEUE) { return this.chanel.bindQueue(binding.destination, binding.source, binding.routingKey); } return this.chanel.bindExchange(binding.destination, binding.source, binding.routingKey); }); await Promise.all(createBindings); return this.disconnect(); } catch (error) { console.error(error); throw new Error(error); } }
}

Теперь напишем абстрактный класс воркера с общим для всех воркеров функционалом, от которого можно будет наследоваться.

import { RabbitConnect } from './RabbitConnect';
import { Message, Order, FailOrder } from './interfaces';
import { ConsumeMessage } from 'amqplib'; export interface WorkerParams { maxRetry?: number; // Максимальное количество повторов обработки active: string; // Имя активной очереди exchange: string; // Имя обменника из которого пришло сообщение holdKey?: string; // Ключ маршрутизации для отложенной очереди
} export abstract class Worker<M extends Order> extends RabbitConnect { private _maxRetry: number; private _active: string; private _holdKey: string | undefined; protected exchange: string; private _currentMessage: Message<M>; private _currentConsumeMessage: ConsumeMessage; constructor({ active, holdKey, exchange, maxRetry }: WorkerParams) { super(); this._maxRetry = maxRetry || 0; this._active = active; this._holdKey = holdKey; this.exchange = exchange; } public async subscribe() { await this.connect(); this.chanel.consume(this._active, this.checkMessage.bind(this)); } // Метод проверки для сообщений у которых превышен лимит повторов private async checkMessage(message: ConsumeMessage) { this._currentConsumeMessage = message; const orderMessage: Message<M> = JSON.parse(message.content.toString()); if (orderMessage.retry >= this._maxRetry) { await this.sendToErrorStorage('Превышен лимит попыток'); } this._currentMessage = orderMessage; // Если количество попыток не превышено вызываем метод с бизнес логикой await this.handler(orderMessage.order || orderMessage); } // Метод отправки сообщения в хранилище ошибок protected async sendToErrorStorage(error: string) { const message: FailOrder = { order: this._currentMessage.order, errors: [...this._currentMessage.errors, error], retry: this._currentMessage.retry + 1, exchange: this.exchange, routingKey: this._active }; console.log('Отправка в хранилище ошибок', message); this.ack(); } // Метод отправки сообщения в отложенную очередь protected async hold(error: string) { if (!this._holdKey) { return; } const orderMessage = { order: this._currentMessage.order, errors: [...this._currentMessage.errors, error], retry: this._currentMessage.retry + 1, }; const orderData = Buffer.from(JSON.stringify(orderMessage)); return this.chanel.publish(this.exchange, this._holdKey, orderData); } // Метод подтверждения удачной обработки сообщения protected async ack() { return this.chanel.ack(this._currentConsumeMessage); } protected abstract handler(message: M): void;
}

Для этого у канала подключения есть метод ack. По умолчанию rabbit требует подтверждение уcпешной обработки сообщения от воркера. Если воркер не смог обработать сообщение, то существует метод nack, который говорит rabbit'у, чтобы он отправил сообщение другому воркеру.

Теперь мы можем написать несколько простых воркеров из схемы.

Воркер генерации ключа маршрутизации.

import { Worker } from '../Worker';
import { isOrderWithPhone, isOrderWithDeliveryAddress, Order, Message,
} from '../interfaces';
import { Keys } from '../constants'; export class GenerateRoutingKey extends Worker<Order> { constructor() { super({ active: 'generateRoutingKey', exchange: 'postprocessing', }); } protected async handler(order: Order) { try { const routingKey: string[] = []; if (isOrderWithPhone(order)) { routingKey.push(Keys.SEND_SMS); } if (isOrderWithDeliveryAddress(order)) { routingKey.push(Keys.SEND_TO_DELIVERY); } const message: Message<Order> = { retry: 0, errors: [], order, }; await this.chanel.publish( this.exchange, routingKey.join('.'), Buffer.from(JSON.stringify(message)), ); await this.ack(); } catch (error) { console.error(error); await this.sendToErrorStorage(error); } }
}

Воркеры отправки sms.

import { Worker } from '../Worker';
import { OrderWithPhone } from '../interfaces'; export class SendSms extends Worker<OrderWithPhone> { constructor() { super({ active: 'sendSms', exchange: 'notify', holdKey: 'sendSmsHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithPhone) { try { console.log('Отправка sms на номер: ', message.phone); this.ack(); } catch (error) { console.error(error); await this.hold(error); } }
}

Воркер интеграции со службой доставки.

import { Worker } from '../Worker';
import { OrderWithDeliveryAddress } from '../interfaces'; export class Delivery extends Worker<OrderWithDeliveryAddress> { constructor() { super({ active: 'delivery', exchange: 'interates', holdKey: 'deliveryHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithDeliveryAddress) { try { console.log('Отправка заказа в службу доставки на адрес: ', message.deliveryAddress); this.ack(); } catch (error) { console.error(error); await this.hold(error); } }
}

Точка входа в приложение.

import { Pipeline } from './Pipeline';
import { pipelineConfig } from './pipeline';
import { GenerateRoutingKey } from './workers/GenerateRoutingKey';
import { SendSms } from './workers/SendSms';
import { Delivery } from './workers/Delivery'; (async () => { try { const pipeline = new Pipeline(pipelineConfig); const generateRoutingKey = new GenerateRoutingKey(); const sendSms = new SendSms(); const delivery = new Delivery(); await pipeline.create(); await Promise.all([generateRoutingKey.subscribe(), sendSms.subscribe(), delivery.subscribe()]); } catch (error) { console.error(error); process.exit(1); }
})();

Это выходит за рамки данной статьи. Приводить пример кода класса записи заказа в базу и генерации номера интернет заказа я не буду. Для проверки кода можно воспользоваться веб интерфейсом rabbit'а, отправив в обменник posrprocessing json заказа.

Заключение

Нам не составит труда добавить в эту схему несколько очередей и воркеров, чтобы добавить нужный функционал. Такая схема построения обработки интернет заказа позволяет легко масштабировать систему. Преобразованная схема будет выглядеть так: Например, можно добавить отправку уведомлений на email или отправку заказа для учета в 1С.

Буду рад любым комментариям и критике. Надеюсь, вам понравилась статья. Весь представленный код можно найти на github

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть