Хабрахабр

Высокоуровневая репликация в СУБД Tarantool

Привет, я занимаюсь созданием приложений для СУБД Tarantool — это разработанная в Mail.ru Group платформа, совмещающая в себе высокопроизводительную СУБД и сервер приложений на языке Lua. Высокая скорость работы решений, основанных на Tarantool, достигается в частности за счет поддержки in-memory режима СУБД и возможности выполнения бизнес-логики приложения в едином адресном пространстве с данными. При этом обеспечивается персистентность данных с использованием ACID-транзакций (на диске ведется WAL-журнал). В Tarantool имеется встроенная поддержка репликации и шардирования. Начиная с версии 2.1, поддерживаются запросы на языке SQL. Tarantool имеет открытый исходный код и распространяется под лицензией Simplified BSD. Также имеется коммерческая Enterprise-версия.

(...aka enjoy the performance)
Feel the power!

В таких приложениях часто возникает необходимость репликации данных.
Как было сказано выше, в Tarantool есть встроенная репликация данных. Все перечисленное делает Tarantool привлекательной платформой для создания высоконагруженных приложений, работающих с БД. Обычно такая репликация (будем далее называть ее низкоуровневой) используется для обеспечения отказоустойчивости приложения и/или для распределения нагрузки на чтение между нодами кластера. Принцип ее работы заключается в последовательном выполнении на репликах всех транзакций, содержащихся в журнале мастера (WAL).

1.
Рис. Репликация внутри кластера

В последнем случае более удобным решением может оказаться использование высокоуровневой репликации — репликации данных на уровне бизнес-логики приложения. Примером альтернативного сценария может служить передача данных, созданных в одной БД, в другую БД для обработки/мониторинга. мы не используем готовое решение, встроенное в СУБД, а своими силами реализуем репликацию внутри разрабатываемого нами приложения. Т.е. Перечислим плюсы. У такого подхода есть как преимущества, так и недостатки.

Экономия трафика: 1.

  • можно передавать не все данные, а только их часть (например, можно передавать только некоторые таблицы, некоторые их столбцы или записи, соответствующие определенному критерию);
  • в отличие от низкоуровневой репликации, которая выполняется непрерывно в асинхронном (реализовано в текущей версии Tarantool — 1.10) или синхронном (будет реализовано в последующих версиях Tarantool) режиме, высокоуровневую репликацию можно выполнять сеансами (т.е. приложение сначала выполняет синхронизацию данных — сеанс обмена данными, затем наступает пауза в репликации, после которой происходит следующий сеанс обмена и т.д.);
  • если запись изменилась несколько раз, можно передать только ее последнюю версию (в отличие от низкоуровневой репликации, при которой на репликах будут последовательно проиграны все изменения, сделанные на мастере).

2. Отсутствуют сложности с реализацией обмена по HTTP, что позволяет синхронизировать удаленные БД.

2.
Рис. Репликация по HTTP

Структуры БД, между которыми передаются данные, не обязаны быть одинаковыми (более того, в общем случае возможно даже использование разных СУБД, языков программирования, платформ и т.п.). 3.

3.
Рис. Репликация в гетерогенных системах

Минус заключается в том, что в среднем программирование сложнее/затратнее, чем конфигурирование, и вместо настройки встроенного функционала придется реализовывать свой собственный.

Рассмотрим несколько способов реализации высокоуровневой репликации данных в СУБД Tarantool. Если в вашей ситуации приведенные плюсы играют решающее значение (или являются необходимым условием), то имеет смысл использовать высокоуровневую репликацию.

Минимизация трафика

Итак, одним из преимуществ высокоуровневой репликации является экономия трафика. Для того чтобы это преимущество проявилось в полной мере, необходимо минимизировать количество данных, передаваемых при каждом сеансе обмена. Разумеется, при этом не стоит забывать о том, что в конце сеанса приемник данных должен быть синхронизирован с источником (как минимум по той части данных, которая участвует в репликации).

Решением «в лоб» может быть отбор данных по дате-времени. Как же минимизировать количество данных, передаваемых при высокоуровневой репликации? Например, у документа «заказ» может быть поле «требуемое время исполнения заказа» — delivery_time. Для этого можно использовать уже имеющееся в таблице поле даты-времени (если оно есть). Таким образом, мы не можем запомнить максимальное значение поля delivery_time, переданное при предыдущем сеанса обмена, и при следующем сеансе обмена отобрать все записи с более высоким значением поля delivery_time. Проблема такого решения заключается в том, что значения в этом поле не обязаны располагаться в последовательности, соответствующей созданию заказов. Также заказ мог претерпеть изменения, которые тем не менее не затронули поле delivery_time. В промежутке между сеансами обмена могли добавиться записи с меньшим значением поля delivery_time. Для решения этих проблем нам потребуется передавать данные «внахлест». В обоих случаях изменения не будут переданы с источника в приемник. при каждом сеансе обмена мы будем передавать все данные со значением поля delivery_time, превышающим некоторый момент в прошлом (например, N часов от текущего момента). Т.е. Кроме того, в передаваемой таблице может не быть поля, связанного с датой-временем. Однако очевидно, что для крупных систем такой подход является сильно избыточным и может свести экономию трафика, к которой мы стремимся, на нет.

В этом случае при каждом сеансе обмена передаются все данные, получение которых не подтверждено получателем. Другое решение, более сложное с точки зрения реализации, заключается в подтверждении получения данных. Если приемник подтверждает получение записи, соответствующее поле принимает значение true, после чего запись более не участвует в обменах. Для реализации потребуется добавить в таблицу-источник булевскую колонку (например, is_transferred). Во-первых, для каждой переданной записи необходимо сгенерировать и отправить подтверждение. Такой вариант реализации имеет следующие минусы. Во-вторых, отсутствует возможность отправки одной и той же записи в несколько приемников (первый получивший приемник подтвердит получение за себя и за всех остальных). Грубо говоря, это может быть сопоставимо с удвоением количества передаваемых данных и привести к удвоению количества раундтрипов.

Такая колонка может иметь тип дата-время и должна задаваться/обновляться приложением на текущее время каждый раз при добавлении/изменении записей (атомарно с добавлением/изменением). Способ, лишенный недостатков, приведенных выше, состоит в добавлении в передаваемую таблицу колонки для отслеживания изменений ее строк. Сохранив максимальное значение поля этой колонки для переданных записей, мы сможем начать следующий сеанс обмена с этого значения (отобрать записи со значением поля update_time, превышающим сохраненное ранее значение). В качестве примера назовем колонку update_time. В результате значения полей в колонке update_time могут быть не уникальными. Проблема, связанная с последним подходом, заключается в том, что изменения данных могут происходить в пакетном режиме. Для постраничной выдачи данных придется изобретать дополнительные механизмы, которые, скорее всего, будут иметь очень низкую эффективность (например, извлечение из БД всех записей со значением update_time выше заданного и выдача определенного количества записей, начиная с некоторого смещения от начала выборки). Таким образом, эта колонка не может быть использована для порционной (постраничной) выдачи данных.

Для этого в качестве значений полей колонки для отслеживания изменений будем использовать целочисленный тип (длинное целое). Можно повысить эффективность передачи данных, немного усовершенствовав предыдущий подход. Значение поля этой колонки по-прежнему должно задаваться/обновляться каждый раз при создании/изменении записи. Назовем колонку row_ver. В результате колонка row_ver будет содержать уникальные значения и сможет быть использована не только для выдачи «дельты» данных (данных, добавившихся/изменившихся после завершения предыдущего сеанса обмена), но и для простой и эффективной разбивки их на страницы. Но в данном случае полю будет присваиваться не текущее дата-время, а значение некоторого счетчика, увеличенного на единицу.

Остановимся на нем подробнее. Последний предложенный способ минимизации количества данных, передаваемых в рамках высокоуровневой репликации, представляется мне наиболее оптимальным и универсальным.

Передача данных с использованием счетчика версий строк

Реализация серверной/master части

В MS SQL Server для реализации подобного подхода существует специальный тип колонки — rowversion. Каждая БД имеет счетчик, который увеличивается на единицу каждый раз при добавлении/изменении записи в таблице, имеющей колонку типа rowversion. Значение этого счетчика автоматически присваивается полю этой колонки в добавившейся/изменившейся записи. СУБД Tarantool не имеет аналогичного встроенного механизма. Однако в Tarantool его несложно реализовать вручную. Рассмотрим, как это делается.

В Tarantool можно создавать последовательности (sequence). Для начала немного терминологии: таблицы в Tarantool называются спейсами (space), а записи — кортежами (tuple). Т.е. Последовательности представляют из себя не что иное, как именованные генераторы упорядоченных значений целых чисел. Ниже мы создадим такую последовательность. это как раз то, что нужно для наших целей.

Прежде чем выполнить какую-либо операцию с базой данных в Tarantool, необходимо выполнить следующую команду:

box.cfg

В результате Tarantool начнет записывать в текущий каталог снимки БД (snapshot) и журнал транзакций.

Создадим последовательность row_version:

box.schema.sequence.create('row_version', { if_not_exists = true })

Опция if_not_exists позволяет выполнять скрипт создания многократно: если объект существует, Tarantool не будет пытаться создать его повторно. Эта опция будет использоваться во всех последующих DDL-командах.

Создадим спейс для примера.

box.schema.space.create('goods', { format = { { name = 'id', type = 'unsigned' }, { name = 'name', type = 'string' }, { name = 'code', type = 'unsigned' }, { name = 'row_ver', type = 'unsigned' } }, if_not_exists = true
})

Здесь мы задали имя спейса (goods), имена полей и их типы.

Создадим автоинкрементный первичный ключ по полю id: Автоинкрементные поля в Tarantool создаются тоже с помощью последовательностей.

box.schema.sequence.create('goods_id', { if_not_exists = true })
box.space.goods:create_index('primary', { parts = { 'id' }, sequence = 'goods_id', unique = true, type = 'HASH', if_not_exists = true
})

Tarantool поддерживает несколько типов индексов. Чаще всего используются индексы типов TREE и HASH, в основе которых лежат соответствующие наименованию структуры. TREE — наиболее универсальный тип индекса. Он позволяет извлекать данные в упорядоченном виде. Но для выбора по равенству больше подходит HASH. Соответственно, для первичного ключа целесообразно использовать HASH (что мы и сделали).

Но в отличии от первичного ключа, значение поля колонки row_ver должно увеличиваться на единицу не только при добавлении новых записей, но и при изменении существующих. Чтобы использовать колонку row_ver для передачи изменившихся данных, необходимо привязать к полям этой колонки значения последовательности row_ver. В Tarantool есть два типа триггеров для спейсов: before_replace и on_replace. Для этого можно использовать триггеры. В отличие от on_replace, before_replace-триггеры позволяют модифицировать данные кортежа, для которого выполняется триггер. Триггеры запускаются при каждом изменении данных в спейсе (для каждого кортежа, затронутого изменениями, запускается функция триггера). Соответственно, нам подходит последний тип триггеров.

box.space.goods:before_replace(function(old, new) return box.tuple.new({new[1], new[2], new[3], box.sequence.row_version:next()})
end)

Приведенный триггер заменяет значение поля row_ver сохраняемого кортежа на очередное значение последовательности row_version.

Для того чтобы можно было извлекать данные из спейса goods по колонке row_ver, создадим индекс:

box.space.goods:create_index('row_ver', { parts = { 'row_ver' }, unique = true, type = 'TREE', if_not_exists = true
})

Тип индекса — дерево (TREE), т.к. данные нам потребуется извлекать в порядке возрастания значений в колонке row_ver.

Добавим в спейс некоторые данные:

box.space.goods:insert{nil, 'pen', 123}
box.space.goods:insert{nil, 'pencil', 321}
box.space.goods:insert{nil, 'brush', 100}
box.space.goods:insert{nil, 'watercolour', 456}
box.space.goods:insert{nil, 'album', 101}
box.space.goods:insert{nil, 'notebook', 800}
box.space.goods:insert{nil, 'rubber', 531}
box.space.goods:insert{nil, 'ruler', 135}

Т.к. первое поле — автоинкрементный счетчик, передаем вместо него nil. Tarantool автоматически подставит очередное значение. Аналогичным образом в качестве значения полей колонки row_ver можно передать nil — или не указывать значение вообще, т.к. эта колонка занимает последнюю позицию в спейсе.

Проверим результат вставки:

tarantool> box.space.goods:select()
---
- - [1, 'pen', 123, 1] - [2, 'pencil', 321, 2] - [3, 'brush', 100, 3] - [4, 'watercolour', 456, 4] - [5, 'album', 101, 5] - [6, 'notebook', 800, 6] - [7, 'rubber', 531, 7] - [8, 'ruler', 135, 8]
...

Как видим, первое и последнее поле заполнились автоматически. Теперь будет несложно написать функцию для постраничной выгрузки изменений спейса goods:

local page_size = 5
local function get_goods(row_ver) local index = box.space.goods.index.row_ver local goods = {} local counter = 0 for _, tuple in index:pairs(row_ver, { iterator = 'GT' }) do local obj = tuple:tomap({ names_only = true }) table.insert(goods, obj) counter = counter + 1 if counter >= page_size then break end end return goods
end

Функция принимает в качестве параметра значение row_ver, начиная с которого необходимо осуществить выгрузку изменений, и возвращает порцию изменившихся данных.

Функция get_goods использует итератор по индексу row_ver для получения изменившихся данных. Выборка данных в Tarantool производится через индексы. Это означает, что итератор будет осуществлять последовательный обход значений индекса начиная с переданного ключа (значения поля row_ver). Тип итератора — GT (Greater Than, больше чем).

Чтобы впоследствии иметь возможность передать данные по HTTP, необходимо выполнить преобразование кортежей к структуре, удобной для последующей сериализации. Итератор возвращает кортежи. Вместо использования tomap можно написать собственную функцию. В примере для этого используется стандартная функция tomap. Например, мы можем захотеть переименовать поле name, не передавать поле code и добавить поле comment:

local function unflatten_goods(tuple) local obj = {} obj.id = tuple.id obj.goods_name = tuple.name obj.comment = 'some comment' obj.row_ver = tuple.row_ver return obj
end

Размер страницы выдаваемых данных (количество записей в одной порции) определяется переменной page_size. В примере значение page_size равно 5. В реальной программе размер страницы обычно имеет большее значение. Он зависит от среднего размера кортежа спейса. Оптимальный размер страницы можно подобрать опытным путем, замеряя время передачи данных. Чем больше размер страницы, тем меньше количество раундтрипов между передающей и принимающей стороной. Так можно уменьшить общее время выгрузки изменений. Однако при слишком большом размере страницы мы будем слишком долго занимать сервер сериализацией выборки. В результате могут возникнуть задержки при обработке других запросов, пришедших на сервер. Параметр page_size можно загрузить из конфигурационного файла. Для каждого передаваемого спейса можно задать свое значение. При этом для большинства спейсов может подойти значение по умолчанию (например, 100).

Выполним функцию get_goods:

tarantool> get_goods(0) ---
- - row_ver: 1 code: 123 name: pen id: 1 - row_ver: 2 code: 321 name: pencil id: 2 - row_ver: 3 code: 100 name: brush id: 3 - row_ver: 4 code: 456 name: watercolour id: 4 - row_ver: 5 code: 101 name: album id: 5
...

Возьмем значение поля row_ver из последней строки и вновь вызовем функцию:

tarantool> get_goods(5) ---
- - row_ver: 6 code: 800 name: notebook id: 6 - row_ver: 7 code: 531 name: rubber id: 7 - row_ver: 8 code: 135 name: ruler id: 8
...

И еще раз:

tarantool> get_goods(8)
---
- []
...

Как видим, при таком использовании функция постранично возвращает все записи спейса goods. За последней страницей следует пустая выборка.

Внесем изменения в спейс:

box.space.goods:update(4, {{'=', 6, 'copybook'}})
box.space.goods:insert{nil, 'clip', 234}
box.space.goods:insert{nil, 'folder', 432}

Мы изменили значение поля name для одной записи и добавили две новых записи.

Повторим последний вызов функции:

tarantool> get_goods(8)
--- - - row_ver: 9 code: 800 name: copybook id: 6 - row_ver: 10 code: 234 name: clip id: 9 - row_ver: 11 code: 432 name: folder id: 10
...

Функция вернула изменившуюся и добавившиеся записи. Таким образом, функция get_goods позволяет получать данные, изменившиеся с момента ее последнего вызова, что и является основой рассматриваемого способа репликации.

Об этом можно прочитать здесь: https://habr.com/ru/company/mailru/blog/272141/ Оставим выдачу результатов по HTTP в виде JSON за рамками настоящей статьи.

Реализация клиентской/slave части

Рассмотрим, как выглядит реализация принимающей стороны. Создадим на принимающей стороне спейс для хранения загруженных данных:

box.schema.space.create('goods', { format = { { name = 'id', type = 'unsigned' }, { name = 'name', type = 'string' }, { name = 'code', type = 'unsigned' } }, if_not_exists = true
}) box.space.goods:create_index('primary', { parts = { 'id' }, sequence = 'goods_id', unique = true, type = 'HASH', if_not_exists = true
})

Структура спейса напоминает структуру спейса в источнике. Но поскольку мы не собираемся передавать полученные данные куда-либо еще, колонка row_ver в спейсе получателя отсутствует. В поле id будут записываться идентификаторы источника. Поэтому на стороне приемника нет необходимости делать его автоинкрементным.

Кроме этого, нам потребуется спейс для сохранения значений row_ver:

box.schema.space.create('row_ver', { format = { { name = 'space_name', type = 'string' }, { name = 'value', type = 'string' } }, if_not_exists = true
}) box.space.row_ver:create_index('primary', { parts = { 'space_name' }, unique = true, type = 'HASH', if_not_exists = true
})

Для каждого загружаемого спейса (поле space_name) будем сохранять здесь последнее загруженное значение row_ver (поле value). В качестве первичного ключа выступает колонка space_name.

Для этого нам потребуется библиотека, реализующая HTTP-клиент. Создадим функцию для загрузки данных спейса goods по HTTP. Следующая строка загружает библиотеку и создает экземпляр HTTP-клиента:

local http_client = require('http.client').new()

Также нам потребуется библиотека для десериализации json:

local json = require('json')

Этого достаточно для создания функции загрузки данных:

local function load_data(url, row_ver) local url = ('%s?rowVer=%s'):format(url, tostring(row_ver)) local body = nil local data = http_client:request('GET', url, body, { keepalive_idle = 1, keepalive_interval = 1 }) return json.decode(data.body)
end

Функция выполняет HTTP-запрос по адресу url, передает в него row_ver в качестве параметра и возвращает десериализованный результат запроса.

Функция сохранения полученных данных выглядит следующим образом:

local function save_goods(goods) local n = #goods box.atomic(function() for i = 1, n do local obj = goods[i] box.space.goods:put( obj.id, obj.name, obj.code) end end)
end

Цикл сохранения данных в спейс goods помещен в транзакцию (для этого используется функция box.atomic) для уменьшения количества операций с диском.

Наконец, функцию синхронизации локального спейса goods с источником можно реализовать так:

local function sync_goods() local tuple = box.space.row_ver:get('goods') local row_ver = tuple and tuple.value or 0 —— set your url here: local url = 'http://127.0.0.1:81/test/goods/list' while true do local goods = load_goods(url, row_ver) local count = #goods if count == 0 then return end save_goods(goods) row_ver = goods[count].rowVer box.space.row_ver:put({'goods', row_ver}) end
end

Сначала считываем сохраненное ранее значение row_ver для спейса goods. Если оно отсутствует (первый сеанс обмена), то берем в качестве row_ver ноль. Далее в цикле производим постраничную загрузку измененных данных из источника по указанному url. На каждой итерации сохраняем полученные данные в соответствующий локальный спейс и обновляем значение row_ver (в спейсе row_ver и в переменной row_ver) — берем значение row_ver из последней строки загруженных данных.

Для защиты от случайного зацикливания (в случае ошибки в программе) цикл while можно заменить на for:

for _ = 1, max_req do ...

В результате выполнения функции sync_goods спейс goods в приемнике будет содержать последние версии всех записей спейса goods в источнике.

Если такая необходимость существует, можно использовать пометку на удаление. Очевидно, что таким способом нельзя транслировать удаление данных. Иногда вместо булевского поля is_deleted удобнее использовать поле deleted, в котором хранится дата-время логического удаления записи. Добавляем в спейс goods булевское поле is_deleted и вместо физического удаления записи используем логическое удаление — выставляем значение поля is_deleted в значение true. После выполнения логического удаления помеченная на удаление запись будет передана из источника в приемник (согласно рассмотренной выше логике).

Последовательность row_ver можно использовать для передачи данных других спейсов: нет необходимости в создании отдельной последовательности для каждого передаваемого спейса.

Мы рассмотрели эффективный способ высокоуровневой репликации данных в приложениях, использующих СУБД Tarantool.

Выводы

  1. СУБД Tarantool — привлекательный, перспективный продукт для создания высоконагруженных приложений.
  2. Высокоуровневая репликация данных имеет ряд преимуществ по сравнению с низкоуровневой репликацией.
  3. Рассмотренный в статье способ высокоуровневой репликации позволяет минимизировать количество передаваемых данных путем передачи только тех записей, которые изменились после последнего сеанса обмена.
Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть