Хабрахабр

[Из песочницы] Логическая репликация из PostgreSQL в Erlang

Приложение Erlang кэширует настройки в ETS для ускорения обработки и снижения нагрузки на БД путём отказа от постоянных запросов. Довольно типичная схема при разработке системы, когда основная логика обработки сосредоточена в приложении (в нашем случае Erlang), а данные для работы этого приложения (настройки, профили пользователей и т. д.) в базе данных (PostgreSQL). При этом изменение этих данных происходит через отдельный (возможно, внешний) сервис.

Есть разные подходы для решения этой задачи. В таких ситуациях встаёт задача поддержания закэшированных данных в актуальном состоянии. О нем и пойдёт речь ниже. Один из них — это логическая репликация PostgreSQL.

Этот механизм аналогичный тому, который используется для физической репликации для создания standby БД. Логическая репликация использует протокол потоковой репликации PostgreSQL для получения изменения данных в таблицах PostgreSQL путём чтения WAL логов, фильтрации нужный таблиц и отправки этих изменений подписчику.

Логическая репликация предоставляет следующие преимущества:

  • получение изменения без задержек в реальном времени;
  • фильтрация получаемых изменений по таблицам и операциям (INSERT/DELETE/UPDATE);
  • полнота и целостность данных, получаемых подписчиком. Подписчик получает изменения в том же порядке, как они происходили в БД;
  • нет потери данных в случае временной остановки подписчика. PostgreSQL запоминает, где остановилась репликация;

Для работы с логической репликацией необходим плагин, который декодирует WAL записи от сервера в более удобный формат.
До версии PostgreSQL 10 можно использовать расширение/extension pglogical_output plugin.
Начиная с PostgreSQL 10 pgoutput plugin.
В этой статье будем рассматривать pgoutput plugin.

На стороне PostgreSQL необходимо выполнить следующие шаги:

  • Выставить параметры для поддержки логической репликации в
    postgresql.conf

    wal_level = 'logical'
    max_replication_slots = 5
    max_wal_senders = 5

  • Роль должна иметь атрибут REPLICATION или SUPERUSER. Создать роль, которая будет использоваться для репликации.

    CREATE USER epgl_test WITH REPLICATION PASSWORD 'epgl_test';

  • Разрешить доступ для этой роли в pg_hba.conf c database = replication

    host replication epgl_test 127.0.0.1/32 trust

  • При создании публикации мы указываем таблицы, которые мы планируем получать в приложении Erlang Создать публикацию/publication.

    CREATE PUBLICATION epgl_test FOR TABLE public.test_table1, public.test_table3; ALTER PUBLICATION epgl_test ADD TABLE public.test_table2; -- добавить таблицу в уже существующую публикацию

На основе этой библиотеки мы и будем строить логику получения изменений в Erlang.
Так как формат непосредственно данных в сообщении XlogData протокола зависит от того, какой плагин используется для слота репликации, библиотека EPGSQL не декодирует данные, а вызывает Callback-метод или посылает сообщение процессу асинхронно. Не так давно поддержка протокола потоковой репликации была добавлена в популярную Erlang библиотеку для работы с PostgreSQL EPGSQL.

Подключение к БД

Должно быть создано специальное репликационное соединение с БД, для этого надо передать флаг replication.
В рамках репликационного соединение к БД можно выполнять только репликационные команды (например DROP_REPLICATION_SLOT, CREATE_REPLICATION_SLOT).
Выполнить обычный запрос через это соединение нельзя.

Создание репликационного слота

Репликационный слот используются для отслеживания текущей позиции переданного WAL-лога.
При создании репликационного слота задаётся плагин для декодирования.

С версии PostgreSQL 10 появилась возможность создавать временные репликационные слоты, которые автоматически удаляются при закрытии репликационного соединения.

Удалять старые/не используемые репликационные слоты крайне важно, потому что PostgreSQL не удаляет WAL логи пока подписчики всех репликационных слотов не получат изменения. Если приложение считывает начальное состояние таблиц каждый раз при старте, то я рекомендую использовать временные репликационные слоты, в этом случае не надо будет заботиться об удалении созданных репликационных слотов (DROP_REPLICATION_SLOT). Если остался не активный репликационный слот, то WAL логи начнут накапливаться и рано или поздно произойдёт переполнение файловой системы.

Получение начального состояния таблиц

предыдущий шаг), автоматически создаётся snapshot, который показывает состояние базы данных на момент создания слота. При создании репликационного слота (см. Этот snapshot может быть использован для загрузки начального состояния таблиц, которое было на начало репликации.

Snapshot доступен только пока репликационное соединение, в котором была выполнена команда CREATE_REPLICATION_SLOT не закрыто.

В этом соединении устанавливаем snapshot SET TRANSACTION SNAPSHOT SnapshotName и извлекаем нужные данные. Для загрузки начальных данных должно быть создано новое обычное/не репликационное соединение к БД, так как выполнить SELECT в репликационном соединении нельзя.

Запуск репликации

При запуске репликации передаём дополнительные параметры для плагина, для pgoutput это имя созданной публикации. Запускаем репликацию для созданного репликационного слота.

Все шаги вместе

start_replication() -> %% Создание репликационного соединения = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}, {replication, "database"}]), %% Создание репликационного слота {ok, _, [{_, _, SnapshotName}|_]} = epgsql:squery(ReplConn, "CREATE_REPLICATION_SLOT epgl_repl_slot TEMPORARY LOGICAL pgoutput"). %% Получение начального состояния таблиц {ok, NormalConn} = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}]), {ok, _, _} = epgsql:squery(NormalConn, "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"), {ok, _, _} = epgsql:squery(NormalConn, ["SET TRANSACTION SNAPSHOT '", SnapshotName, "'"]), %% select/load data epgsql:equery(NormalConn,... epgsql:close(NormalConn), %% Запуск репликации ReplSlot = "epgl_repl_slot", Callback = ?MODULE, CbInitState = #{}, WALPosition = "0/0", PluginOpts = "proto_version '1', publication_names '\"epgl_test\"'", ok = epgsql:start_replication(ReplConn, ReplSlot, Callback, CbInitState, WALPosition, PluginOpts). handle_x_log_data(StartLSN, EndLSN, Data, CbState) -> io:format("~p~n", [{StartLSN, EndLSN, Data}]), {ok, EndLSN, EndLSN, CbState}.

Есть два варианта взаимодействия с библиотекой EPGSQL:

  • В качестве Callback передаётся имя модуля. Синхронный. Функция должна возвращать LastFlushedLSN, LastAppliedLSN, который посылается в ответ PostgreSQL, чтобы отслеживать текущее положение репликационного слота. Библиотека для полученных данных будет вызывать функцию CallbackModule:handle_x_log_data. В своих проектах мы используем только этот вариант;

  • В качестве Callback передаётся pid процесса, который будет получать сообщения вида {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}}. Асинхронный. После обработки процесс должен сообщить обработанный LSN через вызов epgsql:standby_status_update(Conn, FlushedLSN, AppliedLSN);

Или воспользоваться библиотекой с GitHub, которая реализует декодирование для двух плагинов и упрощает выполнение репликационных команд. Дополнительно, чтобы использовать описанный подход, необходимо реализовать декодирование сообщений из формата плагина репликационного слота в более привычные для Erlang структуры.

Показать больше

Похожие публикации

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»