Хабрахабр

Применение принципов функционального программирования при проектировании ERP

Привет, Хабр!

Существующие системы сложны. В этой статье мы попробуем взглянуть на архитектуру учетных систем (ERP, CRM, WMS, MES, B2B, ...) с позиций функционального программирования. При этом единственным «источником правды» в таких системах является хронологически-упорядоченный журнал первичных документов (отпечатков событий реального мира), которые, очевидно, должны быть иммутабельными (и это правило соблюдается в аудируемых системах, где корректировки «задним числом» запрещены). Они базируются на реляционной схеме данных, и имеют огромный мутабельный стейт в виде сотен связаных таблиц. Журнал документов составляет от силы 20% объема БД, а все остальное — промежуточные абстракции и агрегаты, с которыми удобно работать на языке SQL, но которые требуют постоянной синхронизации с документами, и между собой.

Проблема производительности решается благодаря мемоизации, а объем функционального кода будет вполне соизмерим с объемом декларативного SQL, и не сложнее для понимания. Если вернуться к истокам (устранить избыточность данных и отказаться от хранения агрегатов), а все бизнес-алгоритмы реализовать в виде функций, применяемых непосредственно к потоку первичных документов — мы получим функциональную СУБД, и построенную на ней функциональную ERP. В данной статье мы продемонстрируем подход, разработав простейшую файловую СУБД на языке TypeScript и рантайме Deno (аналог Node.js), а также протестируем производительность сверток на примере типичных бизнес-задач.

Почему это актуально

1) Мутабельный стейт + избыточность данных — это плохо, особенно когда необходимо обеспечивать его постоянную синхронизацию с потоком документов. Это источник потенциальных расхождений учетных данных (баланс не сходится) и трудно обнаруживаемых побочных эффектов.
2) Жесткая реляционная схема хранения исходных и промежуточных данных дорого обходится в Big Data, гетерогенных системах, и в условиях быстрых изменений — то есть по сути везде. Мы предлагаем хранить документы в исходном виде, упорядочив по времени, разрешив связи «от новых к старым» и никогда наоборот. Это позволит рассчитывать большинство агрегатов однопроходными алгоритмами прямо из документов, а все остальные таблицы — не нужны.

Это требует небольшого переосмысления языка запросов, и сознательной заботы о кэшировании. 3) SQL устарел, так как предполагает доступность любых данных в любой момент, а в распределенных системах это очевидно не так — при разработке алгоритмов Big Data нужно быть готовым к тому, что часть необходимых данных появится позже, а часть уже появлялась раньше.

А если говорить о серверах — предлагаемая схема имеет больше возможностей для параллелизма, в том числе на кластерах типа SPARK. 4) Современные ЯП позволяют создать отзывчивую систему, оперирующую миллионами записей локально на ноутбуке, где РСУБД просто не установится.

Предыстория вопроса

Проработав достаточно долго с различным бизнес-ПО (системы учета, планирования, WMS), практически везде сталкивался с двумя проблемами — сложность внесения изменений в схему данных, и частое падение производительности, когда эти изменения таки вносились. Вообще, эти системы имеют сложную структуру, поскольку к ним предъявляются противоречивые требования:

Разделение на справочники и операции весьма условно, во взрослых системах справочники огранизованы с версионированием, где каждое изменение оформляется специальным документом. 1) Аудируемость. Нужно хранить все первичные документы в неизменном виде. Таким образом, исходные документы — это иммутабельная часть системы, и она является единственным «источником правды», а все остальные данные могут быть восстановлены из нее.

Естественно, вся необходимая информация не может быть вычислена «на лету», а должна быть доступной в полу-готовом виде. 2) Производительность запросов. Например, при создании строки заказа на продажу система должна рассчитать цену товара с учетом скидок, для чего необходимо извлечь статус клиента, его текущий баланс, историю покупок, текущие акции в регионе, и т.д. Их объем составляет до 80% размера БД, структура таблиц жестко фиксирована, при любых изменениях в алгоритмах — программист должен позаботиться о правильном обновлении агрегатов. Поэтому существующие системы хранят удобные абстракции над строками документов (проводки), а также заранее рассчитанные агрегаты (регистры накопления, временные срезы, текущие остатки, сводные проводки, и т.д.). По сути агрегаты это и есть мутабельное состояние системы.

Поэтому алгоритмы обновления агрегатов — самая болезненная точка системы, и при внесении большого количества изменений имеется существенный риск что-то сломать, и тогда данные «разъедутся», то есть агрегаты перестанут соответствовать документам. 3) Транзакционная производительность. При проведении любого документа нужно пересчитать все агретаты, а это обычно блокирующая операция. Эта ситуация — бич всех проектов внедрения и последующей поддержки.

Устанавливаем основы новой архитектуры

1) Хранение. Основа БД — хронологически упорядоченный журнал документов, отражающих свершившиеся факты реального мира. Справочники это тоже документы, просто длительного действия. И документ, и каждая версия записи справочника — иммутабельны. Никаких других данных в виде проводок / регистров / остатков в системе не хранится (сильное провокационное утверждение, в жизни бывает по разному, но нужно стремиться к совершенству). Документ имеет ряд системных атрибутов:

, ...
}

Документы с одинаковым code и разными ts образуют историческую группу, где актуальной считается последняя запись, остальные — историческими. Если установлен атрибут cache, последняя запись из группы попадают в так называемый топ-кэш, и одновременно все записи попадают в фулл-кэш, таким образом мы можем быстро извлечь запись справочника как по id, так и по code.

Изменение или удаление (отмена) старого документа — это всегда новый документ, записываемый в журнал с текущим ts. Документы могут дописываться в конец журнала, и никогда в середину. Таким образом, причинно-следственная связь определяется положением документа в журнале, скачки в прошлое или будущее запрещены (при этом даты принятия к учету, даты исполнения планов могут быть любыми, но с точки зрения ядра системы это просто атрибуты).

В отличие от «sql foreign key» — указывать тип сущности, на которую ссылаемся, нет необходимости, так как сущности лежат вперемешку, а id уникален. 2) Связи. Документы могут ссылаться друг на друга по id. Это означает, что в любом пользовательском алгоритме при обработке текущего документа могут быть востребованы связанные документы, уже встречавшиеся в выборке ранее (и по идее они должны быть кэшированы — либо ядром, либо пользовательским алгоритмом). Связи от ранних документов к более поздним запрещены.

открытые документы) не может быть зафиксирована, поэтому вводится понятие горизонта иммутабельности, а база данных разделяется на 2 физических хранилища — иммутабельное хранилище и текущее хранилище. 3) Горизонт иммутабельности. Часть документов, с которыми ведется активная работа (т.н. Все что позднее — называется текущим периодом, и при каждом запросе второе хранилище сканируется заново. Все документы в первом хранилище имеют временную метку меньше горизонта, они неизменны, а результаты всех сверток кэшируются и переиспользуются. Горизонт иммутабельности — термин, хорошо знакомый коллегам из 1С, и бухгалтерам. Такая схема дает линейное время. Производительность системы зависит исключительно от размера бардака текущего периода, и в этом вопросе мировая бизнес-практика беспощадна — чем он меньше, тем лучше.

Любой бизнес-алгоритм — это композиция функций filter(), reduce(), get(), gettop(), примененная к потоку документов. 4) Алгоритмы. Журнал документов может храниться в любом виде — последовательный файл, документная БД, таблица РСУБД, поступать из внешнего стрима — главное чтобы они были извлекаемы в прямом либо обратном хронологическом порядке. Естественно, помогает системный кэш, хранящий как отдельные документы, к которым уже были запросы по id / code, так и результаты расчетов, выполненных ранее (при полном совпадении входных параметров этих расчетов). Ввиду отсутствия семантики JOIN, у пользователя остается возможность использовать вложенные подзапросы к БД, либо пытаться ограничиться одним проходом, помещая в пользовательский кэш все, что может потребоваться в будущем.

Результаты запросов и расчетов попадают в кэш в случаях:
— документ имеет атрибут cache, при первом reduce() он записывается в фулл-кэш, и обновляет запись в топ-кэше;
— документ извлечен запросом по id / code, и он находится в иммутабельном хранилище;
reduce() завершил расчет по иммутабельному хранилищу, промежуточный результат клонируется и записывается в кэш, а расчет продолжается по текущему хранилищу.
Мы видим, что в отличие от жестко-структурированного «кэша» в системах, основанных на РСУБД, мы имеем адаптивный кэш, наполняемый по мере работы системы. 5) Мемоизация, или кэширование. Пользователю даются ограниченные инструменты управления кэшем. Необходимость экономить память заставляет ограничивать объем кэшируемой информации, поэтому, например, результат функции filter() не кэшируется, а результат reduce() — обязательно.

Первый — когда при обработке текущего документа нам нужно найти несколько связанных документов по неточным критериям. 6) Поиск бывает 3-х видов. Второй вид поиска — когда нам нужно получить актуальные элементы справочника, без исторических данных (т.н. В этом случае либо подзапрос, либо в своем reduce() заранее сохраням все что может потенциально потребоваться, и когда потребовалось — ищем в этой выборке. В этом случае используется топ-кэш, который как раз хранит такие элементы. текущий срез). Насколько целесообразно кэшировать результаты пользовательских поисков — вопрос дискуссионный, в какой-то мере очевидно необходимо, например с ограничением объема выборки. В третьем случае это fullscan по базе в обратной хронологической последовательности.

При перемещении документа из текущего хранилища в иммутабельное — по идее нужно до-обновить все кэшированные агрегаты, эта операция тяжелая (зависит от количества переносимых документов и количества результатов в кэше), и должна выполняться по аналогии с закрытием учетного периода — в часы наименьшей нагрузки. 7) Добавление документов и перемещение горизонта. При добавлении нового документа актуализируется только топ-кэш.

Простая функциональная СУБД

Итак, попробуем что-нибудь написать. Язык TypeScript выбран за идеальное сочетание скриптового динамизма и типизации, рантайм Deno — за удобную поддержку TypeScript и WASM, а также наличия Rust API, что теоретически дает нам шанс ускорить некоторые алгоритмы (хотя это неточно).
Документы в нашей СУБД будут храниться в виде 2-х последовательных файлов, содержащих объекты JSON, разделенные символом "\x01", так как это позволяет написать быстрый потоковый парсер. API чтения состоит пока всего из 3-х функций:

type Document = any
type Result = any public async get(id: string): Promise<Document | undefined> public async gettop(code: string): Promise<Document | undefined> public async reduce( filter: (result: Result, doc: Document) => Promise<boolean>, reducer: (result: Result, doc: Document) => Promise<void>, result: Result
): Promise<Result>

Первая функция возвращает документ по id, вторая возвращает последний документ с заданным code, третья осуществляет фильтрацию и свертку, принимая на вход функцию фильтрации, функцию свертки и начальное значение аккумулятора. Мы сознательно не использовуем цепочку filter().reduce(), так как хотим кэшировать итоговый результат, а в случае цепочки — кэшировать отдельно результат фильтрации расточительно, а кэшировать результат свертки без знания условий фильтрации — бессмысленно. Поэтому reduce() получает на вход сразу все необходимое для расчета, и использует составной хэш от трех параметров в качестве ключа мемоизации.

Обратите внимание, что оба колбэка возвращают промис, то есть внутри них разрешены вложенные запросы get() и reduce(). Собственно, весь пользовательский алгортм представляет собой реализацию колбэков filter и reducer, а аккумулятор-результат может быть любым сериализуемым объектом. второй тест). Благодаря промисам вложенный цикл (например по строкам текущего документ) можно параллелить (см.

Исходные данные

Рассмотрим простейшую систему учета покупок и продаж. Нам нужны справочники контрагентов и номенклатур, а также документы покупки и продажи. Если мы хотим считать себестоимость расходов и маржу, нужен еще один документ — сопоставление приходов с расходами, но это уже тема отдельной статьи.

Партнеры и номенклатуры

{ "sys": { "code": "partner.1", "ts": 1578263624612, "id": "partner.1^1578263624612", "cache": 1 }, "type": "partner.retail", "name": "Рога и копыта ООО"
}
{ "sys": { "code": "invent.1", "ts": 1578263624612, "id": "invent.1^1578263624612", "cache": 1 }, "type": "invent.material", "name": "Гвоздь строительный 20мм"
}

Атрибут type — пользовательский, его иерархия никак не используется ядром, а лишь в пользовательских алгоритмах. Также не имеет значение семантика атрибута code — для ядра это просто строка.

Покупки и продажи

{ "sys": { "code": "purch.1", "ts": 1578263624613, "id": "purch.1^1578263624613" }, "type": "purch", "date": "2020-01-07", "partner": "partner.3^1578263624612", "lines": [ { "invent": "invent.0^1578263624612", "qty": 2, "price": 232.62838134273366 }, { "invent": "invent.1^1578263624917", "qty": 24, "price": 174.0459600393788 } ]
}

Документы отличаются только типом (purch | sale), cтроки хранятся прямо в документе (в реляционной схеме они лежали бы в отдельной таблице).

Реализация алгоритмов

Анализ продаж
Считаем общую сумму продаж, средний чек, и среднее количество строк на документ.

import { FuncDB } from "./FuncDB.ts"
const db = FuncDB.open('./sample_database/') let res = await db.reduce( async (_, doc) => doc.type == 'sale', // фильтруем только продажи async (result, doc) => { result.doccou++ doc.lines.forEach(line => { // цикл по строкам документа result.linecou++ result.amount += line.price * line.qty }) }, {amount: 0, doccou: 0, linecou: 0} // инициализируем аккумулятор
) console.log(` amount total = ${res.amount} amount per document = ${res.amount / res.doccou} lines per document = ${res.linecou / res.doccou}`
)

Обороты в разрезе номенклатур и партнеров
По сути это сводная таблица, поэтому в качестве аккумулятора используем Map.

class ResultRow { // строка результирующей таблицы invent_name = '' partner_name = '' debit_qty = 0 debit_amount = 0 credit_qty = 0 credit_amount = 0
} let res = await db.reduce( async (_, doc) => doc.type == 'purch' || doc.type == 'sale', async (result, doc) => { // поскольку внутри цикла у нас await - параллелим обработку строк const promises = doc.lines.map(async (line) => { const key = line.invent + doc.partner let row = result.get(key) if (row === undefined) { row = new ResultRow() // наименования получаем подзапросами к базе (они кэшируются) const invent = await db.get(line.invent) const partner = await db.get(doc.partner) row.invent_name = invent ? invent.name : line.invent + ' not found' row.partner_name = partner ? partner.name : doc.partner + ' not found' result.set(key, row) } if (doc.type == 'purch') { row.debit_qty += line.qty row.debit_amount += line.qty * line.price } else if (doc.type == 'sale') { row.credit_qty += line.qty row.credit_amount += line.qty * line.price } }) await Promise.all(promises) }, new Map<string, ResultRow>() // результирующая таблица (аккумулятор)
)

Мы видим, что половину кода составляет извлечение наименований подзапросами. Это легко исправить, написав сервисную функцию, но для общего понимания оставлю так. Обратите внимание, что мы параллелим обработку строк — в случае если номенклатуры нет в кэше — запускается fullscan, результата которого в нашем случае ждать необязательно.

Бенчмаркинг

Тестируем на сгенерированных данных:
иммутабельное хранилище: 100 номенклатур + 100 контрагентов + 100 тыс. документов
текущее хранилище: 10 номенклатур + 10 контрагентов + 10 тыс. документов
Использую доисторический ноутбук с процессором Intel Celeron CPU N2830 @ 2.16 GHz

Видно что второй раз иммутабельное хранилище не обрабатывается, и расчет происходит в 10 раз быстрее. В первом тесте демонстрируем кэширование — сначала запускаем анализ продаж, затем добавляем новый документ продажи, и снова запускаем расчет.

Результаты - 100 тыс. документов за 11.1 секунды:

file: database_immutable.json:
 100200 docs parsed (0 errors)
 50018 docs processed (0 errors)
11.098s elapsed
file: database_current.json:
 10020 docs parsed (0 errors)
 4987 docs processed (0 errors)
1.036s elapsed
amount total = 623422871.2641689
amount per document = 11389.839613851627
lines per document = 3.6682561432355896

034s elapsed
amount total = 623433860. file: database_current.json:
 10021 docs parsed (0 errors)
 4988 docs processed (0 errors)
1. 832290707558
lines per document = 3. 2641689
amount per document = 11389. 6682073954983925

Если честно, я рассчитывал минимум на миллион документов в секунду. Разберемся, где у нас основная задержка на примере обработки первого файла:
8.8s — чтение файла и извлечение строковых JSON, разделенных символом "\x01"
1.9s — парсинг JSON в объекты
0.4s — кэширование + пользовательский алгоритм
Заглянув в исходники Deno, я понял, основная задержка возникает при декодировании unicode, ведь V8 в качестве байто-дробилки подходит плохо. Это значит, что переписать критические куски на WASM/Rust будет очень просто, а если в качестве хранилища использовать нормальную объектную БД, то и парсинга JSON можно избежать, и тогда достичь миллиона записей в секунду — более чем реально. И это я не говорю про нормальное железо.

Второй раз производительность упала в 3 раза, потому что в первом тесте мы добавили документ продажи, ссылающийся на несуществующего партнера и номенклатуру, и, не найдя их в кэше, система вынуждена дважды запускать fullscan. Второй тест мы запускаем сначала на свеже-сгенерированной базе, затем после прогона первого теста. Но эта ситуация по сути аварийная, а нормальная выглядит так:

Результаты - 100 тыс. документов за 13.3 секунды:

307s elapsed
file: database_current.json:
 10020 docs parsed (0 errors)
 10000 docs processed (0 errors)
1. file: database_immutable.json:
 100200 docs parsed (0 errors)
 100000 docs processed (0 errors)
13. 296s elapsed

Наконец-то пошла рабочая нагрузка, мы делаем 2 вложенных асинхронных запроса, и получаем затраты на пользовательский алгоритм — 2.6s. Допускаю, что оборачивание любой функции в промис накладно, а в нашем случае запрос к кэшу выполняется синхронно, и возможно это место стоит оптимизировать.

Резюме

В целом я доволен результатом, схема получилась вполне рабочая, проект можно развивать, если у кого есть мысли — пишите. Буду благодарен за ссылку на публичные обфусцированные данные, приближенные к реальности (счета-фактуры, EDI, или что-то подобное), необходимые для полноценного тестирования.

Полный код на гитхабе

Спасибо за внимание.

Показать больше

Похожие публикации

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»