Главная » Хабрахабр » #PostgreSQL. Ускоряем деплой в семь раз с помощью «многопоточки»

#PostgreSQL. Ускоряем деплой в семь раз с помощью «многопоточки»

Всем привет! Мы на проекте ГИС ЖКХ используем PostgreSQL и недавно столкнулись с проблемой долгого выполнения SQL скриптов из-за быстрого увеличения объема данных в БД. В феврале 2018 года на PGConf я рассказал, как мы решали эту проблему. Слайды презентации доступны на сайте конференции. Предлагаю вашему вниманию текст моего выступления.

Дано

Про ГИС ЖКХ уже была подробная статья в блоге группы ЛАНИТ на Хабре. Если в двух словах ГИС ЖКХ – это первый в России федеральный портал о всей информации в ЖКХ, который запущен почти во всех регионах (в 2019 году присоединятся Москва, Питер и Севастополь). За последние три месяца в систему было загружено более 12 ТБ данных о домах, лицевых счетах, фактах оплаты и много-много еще чего, а всего в PostgreSQL сейчас лежит уже более 24 ТБ.

Проект архитектурно разделен на подсистемы. Каждой подсистеме выделена отдельная база данных. Всего таких баз сейчас около 60, они размещены на 11 виртуальных серверах. Некоторые подсистемы нагружены сильнее других, и у них базы по объему могут занимать 3-6 терабайт.

ЦУП, у нас проблема

Теперь немного подробнее расскажу о проблеме. Начну издалека: у нас код приложения и код миграций базы данных (под миграцией я понимаю перевод базы данных из одной ревизии в другую с выполнением всех необходимых SQL скриптов для этого) хранятся вместе в системе контроля версий. Это возможно благодаря использованию Liquibase (подробнее про Liquibase на проекте можно узнать из доклада Миши Балаяна на TechGuruDay в ЛАНИТ).

Теперь давайте представим себе выпуск версии. Когда данных всего пара терабайт или меньше и все таблицы в пределах сотни гигабайт, изменения (миграции) любых данных или изменения структуры в любых таблицах проходят быстро (обычно).

А теперь представим, что у нас данных уже пара десятков терабайт и появилось несколько таблиц по терабайту и больше (возможно, разбитых на партиции). В новой версии нам нужно провести миграцию по одной из этих таблиц или еще хуже сразу по всем. И при этом время регламентных работ увеличивать нельзя. И при этом такую же миграцию нужно провести и на тестовых базах данных, где железо слабее. И при этом нужно понять заранее, сколько в сумме все миграции по времени займут. Здесь и начинается проблема.

Сперва мы попробовали советы из официальной документации PostgreSQL (удаление индексов и FK перед массовой миграцией, пересоздание таблиц с нуля, использование copy, динамическое изменение конфига). Это дало эффект, но нам хотелось еще быстрее и удобнее (тут, конечно, дело субъективное – кому как удобно :–)). В результате мы реализовали параллельное выполнение массовых миграций, что увеличило скорость на многих кейсах в разы (а иногда и на порядок). Хотя на самом деле запускается параллельно несколько процессов, внутри команды у нас прижилось слово “многопоточка”.

«Многопоточка»

Основная идея такого подхода заключается в разделении большой таблицы на непересекающиеся диапазоны (например, функцией ntile) и выполнение SQL скрипта не сразу по всем данным, а параллельно по нескольким диапазонам. Каждый параллельный процесс забирает себе один диапазон, блокирует его и начинает выполнять SQL скрипт только для данных из этого диапазона. Как только скрипт отработал, мы опять ищем незаблокированный и еще не обработанный диапазон и повторяем операцию. Важно выбрать правильный ключ для разделения. Это должно быть проиндексированное поле с уникальными значениями. Если такого поля нет, можно использовать служебное поле ctid.

Первая версия «многопоточки» была реализована с помощью вспомогательной таблицы с диапазонами и функции взятия следующего диапазона. Требуемый SQL скрипт подставлялся в анонимную функцию и запускался в требуемом количестве сессий, обеспечивая параллельное выполнение.

Пример кода

-- Таблица UPDATE_INFO_STEPS используется для реализации обновления/заполнения -- больших таблиц, выполнения сложных запросов обновления/заполнения
CREATE TABLE UPDATE_INFO_STEPS ( BEGIN_GUID varchar(36), END_GUID varchar(36) NOT NULL, STEP_NO int, STATUS char(1), BEGIN_UPD timestamp, END_UPD timestamp, ROWS_UPDATED int, ROWS_UPDATED_TEXT varchar(30), DISCR varchar(10)
);
ALTER TABLE UPDATE_INFO_STEPS ADD PRIMARY KEY(discr, step_no); -- Функция FUNC_UPDATE_INFO_STEPS реализует ключевой функционал. -- Возможность "брать" следующий интервал, если текущий занят.
CREATE OR REPLACE FUNCTION func_update_info_steps( pStep_no int, pDiscr varchar(10)
) RETURNS text AS $BODY$ DECLARE lResult text;
BEGIN SELECT 'SUCCESS' INTO lResult FROM update_info_steps WHERE step_no = pStep_no AND discr = pDiscr AND status = 'N' FOR UPDATE NOWAIT; UPDATE UPDATE_INFO_STEPS SET status = 'A', begin_upd = now() WHERE step_no = pStep_no AND discr = pDiscr AND status = 'N'; return lResult; EXCEPTION WHEN lock_not_available THEN SELECT 'ERROR' INTO lResult; return lResult;
END;
$BODY$ LANGUAGE PLPGSQL VOLATILE; -- Пример использования (1 процесс на 1 сессию)
-- Шаг 1. Заполняем служебную таблицу интервалами для обработки.
DO LANGUAGE PLPGSQL $$ DECLARE -- Указать количество обрабатываемых записей за одну итерацию l_count int := 10000; -- Подставить идентификатор l_discr VARCHAR(10) := '<discr>';
BEGIN INSERT INTO UPDATE_INFO_STEPS ( BEGIN_GUID, END_GUID, STEP_NO, STATUS, DISCR ) SELECT min(guid) BEGIN_GUID, max(guid) END_GUID, RES2.STEP STEP_NO, 'N' :: char(1) STATUS, l_discr DISCR FROM ( SELECT guid, floor( (ROWNUM - 1) / l_count ) + 1 AS STEP FROM ( -- Подставить название колонки SELECT <column> AS GUID, -- Подставить название колонки row_number() over ( ORDER BY <column> ) AS ROWNUM FROM -- Подставить схему и название таблицы <schema>.<table_name> ORDER BY 1 -- ) RES1 ) RES2 GROUP BY RES2.step;
END;
$$; -- Шаг 2. Используя служебную таблицу, выполняем скрипт UPDATE.
DO LANGUAGE PLPGSQL $$ DECLARE cur record; vCount int; vCount_text varchar(30); vCurStatus char(1); vCurUpdDate date; -- Подставить идентификатор l_discr varchar(10) := '<discr>'; l_upd_res varchar(100);
BEGIN FOR cur IN ( SELECT * FROM UPDATE_INFO_STEPS WHERE status = 'N' AND DISCR = l_discr ORDER BY step_no ) LOOP vCount := 0; -- Внутренняя транзакция обязательна! SELECT result INTO l_upd_res FROM dblink( '<parameters>', 'SELECT FUNC_UPDATE_INFO_STEPS(' || cur.step_no || ',''' || l_discr || ''')' ) AS T (result text); IF l_upd_res = 'SUCCESS' THEN -- Основной скрипт. В данной секции необходимо выполнять -- требуемые действия по обновлению, вставке и тп. -- Обязательное требование - использовать интервал -- cur.begin_guid - cur.end_guid и dblink на "самого себя". -- Указан примерный скрипт. SELECT dblink( '<parameters>', 'UPDATE FOO set level = 42 WHERE id BETWEEN ''' || cur.begin_guid || ''' AND ''' || cur.end_guid || '''' ) INTO vCount_text; -- Конец основного скрипта. SELECT dblink( '<parameters>', 'update UPDATE_INFO_STEPS SET status = ''P'', end_upd = now(), rows_updated_text = ''' || vCount_text || ''' WHERE step_no = ' || cur.step_no || ' AND discr = ''' || l_discr || '''' ) INTO l_upd_res; END IF; END LOOP;
END;
$$; -- Мониторинг выполнения.
SELECT SUM(CASE status WHEN 'P' THEN 1 ELSE 0 END) done, SUM(CASE status WHEN 'A' THEN 1 ELSE 0 END) processing, SUM(CASE status WHEN 'N' THEN 1 ELSE 0 END) LEFT_, round( SUM(CASE status WHEN 'P' THEN 1 ELSE 0 END):: numeric / COUNT(*)* 100 :: numeric, 2 ) done_proc FROM UPDATE_INFO_STEPS WHERE discr = '<discr>';

Такой подход хоть и работал быстро, но требовал очень большого числа действий руками. И если деплой проходил в 3 часа ночи, ДБА должен был отловить момент выполнения «многопоточного» скрипта в Liquibase (который его выполнял, по сути, в одном процессе) и запустить руками еще несколько процессов для ускорения.

«МноGOпоточка 2.0»

Предыдущая версия «многопоточки» была неудобной в использовании. Поэтому мы сделали приложение на Go, которое автоматизирует процесс (можно сделать и на Python, например, да и на многих других языках).

Сперва мы разбиваем данные в изменяемой таблице на диапазоны. После этого во вспомогательную таблицу задач добавляем информацию о скрипте – его имя (уникальный идентификатор, например, имя задачи в Jira) и количество одновременно запускаемых процессов. Затем во вспомогательную таблицу скриптов добавляем текст SQL миграции, разбитый на диапазоны.

Пример кода

-- В целевой БД необходимо создать объекты, в которых будет храниться -- конфигурация многопоточного обновления (pg_parallel_task)
-- и логи задания (pg_parallel_task_statements).
CREATE TABLE IF NOT EXISTS public.pg_parallel_task ( name text primary key, threads_count int not null DEFAULT 10, comment text
);
COMMENT ON table public.pg_parallel_task IS 'Задание параллельного выполнения';
COMMENT ON COLUMN public.pg_parallel_task.name IS 'Уникальный идентификатор';
COMMENT ON COLUMN public.pg_parallel_task.threads_count IS 'Количество одновременных потоков обработки. По умолчанию 10';
COMMENT ON COLUMN public.pg_parallel_task.comment IS 'Комментарий'; CREATE TABLE IF NOT EXISTS public.pg_parallel_task_statements ( statement_id bigserial primary key, task_name text not null references public.pg_parallel_task (name), sql_statement text not null, status text not null check ( status in ( 'new', 'in progress', 'ok', 'error' ) ) DEFAULT 'new', start_time timestamp without time zone, elapsed_sec float(8), rows_affected bigint, err text
);
COMMENT ON table public.pg_parallel_task_statements IS 'Операторы параллельного выполнения';
COMMENT ON COLUMN public.pg_parallel_task_statements.sql_statement IS 'Полный текст выполняемого запроса';
COMMENT ON COLUMN public.pg_parallel_task_statements.status IS 'Статус обработки текущего оператора. Один из new|in progress|ok|error';
COMMENT ON COLUMN public.pg_parallel_task_statements.start_time IS 'Время начала выполнения текущего оператора';
COMMENT ON COLUMN public.pg_parallel_task_statements.elapsed_sec IS 'Для выполненных операторов, затраченное время в секундах';
COMMENT ON COLUMN public.pg_parallel_task_statements.rows_affected IS 'Для выполненных операторов, количество затронутных строк';
COMMENT ON COLUMN public.pg_parallel_task_statements.err IS 'Для выполненных операторов, текст ошибки. NULL, если выполнение успешно.'; -- Основной скрипт
INSERT INTO PUBLIC.pg_parallel_task (NAME, threads_count) VALUES ('JIRA-001', 10); INSERT INTO PUBLIC.pg_parallel_task_statements (task_name, sql_statement) SELECT 'JIRA-001' task_name, FORMAT( 'UPDATE FOO SET level = 42 where id >= ''%s'' and id <= ''%s''', MIN(d.id), MAX(d.id) ) sql_statement FROM ( SELECT id, NTILE(10) OVER ( ORDER BY id ) part FROM foo ) d GROUP BY d.part;
-- Конец основного скрипта

При деплое происходит вызов приложения на Go, которое считывает конфигурацию задачи и скрипты по этой задаче из вспомогательных таблиц и автоматически запускает скрипты с заданным числом параллельных процессов (worker’ов). После выполнения управление передается обратно в Liquibase.

Код

<changeSet id="JIRA-001" author="soldatov"> <executeCommand os="Linux, Mac OS X" executable="./pgpar.sh"> <arg value="testdatabase"/><arg value="JIRA-001"/> </executeCommand>
</changeSet>

Приложение состоит из трех основных абстракций:

  • task – загружает в память параметры миграции, количество процессов и все диапазоны, запускает “многопоточку” и поднимает Web–сервер для отслеживания прогресса выполнения;
  • statement – представляет собой один диапазон выполняемой операции, также отвечает за изменение статуса выполнения диапазона, запись времени выполнения диапазона, количество строк в диапазоне и т.д.;
  • worker – представляет собой один поток выполнения.

В методе task.do создается канал, в который отправляются все statements операции. На этом канале запускается указанное число worker’ов. Внутри worker’ов бесконечный цикл, он мультиплексирует на двух каналах: по которому получает statements и выполняет их, и пустой канал как сигнализатор? что надо завершиться. Как только пустой канал будет закрыт, worker завершит работу – это случается при ошибке в одном из worker’ов. Т.к. каналы в Go это thread–safe структура, то закрытием одного канала мы можем отменить все worker’ы разом. Когда statement в канале закончится, worker просто выйдет из цикла, и уменьшит общий для всех worker'ов счетчик. Так как task всегда знает, сколько worker’ов по нему работает, он просто ждет, когда этот счетчик обнулится и после этого завершается сам.

Плюшки

За счет такой реализации «многопоточки» появилось несколько интересных фич:

  • Интеграция с Liquibase (вызываем с помощью тега executeCommand).
  • простой веб–интерфейс, который появляется при запуске “многопоточки” и содержит всю информацию о ходе ее выполнения.
  • Прогресс–бар (мы знаем, сколько обрабатывается один диапазон, сколько запущено параллельных процессов и сколько диапазонов еще осталось обработать – значит можем подсчитать время завершения).
  • Динамическое изменение параллельных процессов (пока это мы делаем руками, но в дальнейшем хотим автоматизировать).
  • Логирование информации по ходу выполнения многопоточных скриптов для возможности дальнейшего анализа.
  • Можно выполнять блокирующие операции типа update, почти ничего не блокируя (если разбить табличку на очень маленькие диапазоны, все скрипты будут выполняться почти мгновенно).
  • Есть обертка для вызова “многопоточки” прямо из БД.

Не плюшки

Основным недостатком является необходимость один раз пройти фулсканом по табличке для разбиения ее на диапазоны, если в качестве ключа используется текстовое поле, дата или uid. Если ключом для разбиения выбрано поле с последовательно увеличивающимися плотными значениями, то такой проблемы нет (мы заранее можем указать все диапазоны, просто задав требуемый шаг).

Ускоряемся в семь раз (тест на pgbench таблице)

Напоследок приведу пример сравнения по скорости выполнения операции UPDATE 500 000 000 строк без использования «многопоточки» и с ней. Простой UPDATE выполнялся 49 минут, тогда как «многопоточка» завершилась за семь минут.

Пример кода

SELECT count(1) FROM pgbench_accounts; count
-------
500000000
(1 row) SELECT pg_size_pretty(pg_total_relation_size('pgbench_accounts'));
pg_size_pretty
---------------- 62 Gb
(1 row) UPDATE pgbench_accounts
SET abalance = 42;
-- Время выполнения 49 минут vacuum full analyze verbose pgbench_accounts; INSERT INTO public.pg_parallel_tASk (name, threads_count) values ('JIRA-002', 25); INSERT INTO public.pg_parallel_tASk_statements (tASk_name, sql_statement)
SELECT 'JIRA-002' tASk_name, FORMAT('UPDATE pgbench_accounts SET abalance = 42 WHERE aid >= ''%s'' AND aid <= ''%s'';', MIN(d.aid), MAX(d.aid)) sql_statement
FROM (SELECT aid, ntile(25) over (order by aid) part FROM pgbench_accounts) d
GROUP BY d.part;
-- Время выполнения 10 минут -- Можно дробить по ctid, но получится неравномерно и нужно чтобы эту таблицу никто не изменял в процесе многопоточки
INSERT INTO public.pg_parallel_tASk_statements (tASk_name, sql_statement)
SELECT 'JIRA-002-ctid' tASk_name, FORMAT('UPDATE pgbench_accounts SET abalance = 45 WHERE (ctid::text::point)[0]::text > ''%s'' AND (ctid::text::point)[0]::text <= ''%s'';', (d.min_ctid), (d.max_ctid)) sql_statement
FROM ( WITH max_ctid AS ( SELECT MAX((ctid::text::point)[0]::int) FROM pgbench_accounts) SELECT generate_series - (SELECT max / 25 FROM max_ctid) AS min_ctid, generate_series AS max_ctid FROM generate_series((SELECT max / 25 FROM max_ctid), (SELECT max FROM max_ctid), (SELECT max / 25 FROM max_ctid))) d;
-- Время выполнения 9 мин ./pgpar-linux-amd64 jdbc:postgresql://localhost:5432 soldatov password testdatabase JIRA-002 -- Время выполнения 7 минут

P.S. Вам это надо, если:

Все инструменты хороши для определенных задач, и вот несколько таких для «многопоточки».

  • UPDATE таблиц > 100 000 строк.
  • UPDATE со сложной логикой, которую можно распараллелить (например, вызов функций для вычисления чего-либо).
  • UPDATE без локов. За счет дробления на очень маленькие диапазоны и запуска небольшого числа процессов можно добиться мгновенной обработки каждого диапазона. Таким образом, блокировка тоже будет почти мгновенной.
  • Параллельное выполнение changeSet’ов в Liquibase (например, VACUUM).
  • Создание и заполнение данными новых полей в таблице.
  • Сложные отчеты.

Почти неблокирующий UPDATE (50 000 диапазонов по 10 000 строк каждый)

<changeSet author="soldatov" id="JIRA-002-01"> <sql> <![CDATA[ INSERT INTO public.pg_parallel_task (name, threads_count) VALUES ('JIRA-002', 5); INSERT INTO public.pg_parallel_task_statements (task_name, sql_statement) SELECT 'JIRA-002' task_name, FORMAT( 'UPDATE pgbench_accounts SET abalance = 42 WHERE filler IS NULL AND aid >= ''%s'' AND aid <= ''%s'';', MIN(d.aid), MAX(d.aid) ) sql_statement FROM ( SELECT aid, ntile(10000) over ( order by aid ) part FROM pgbench_accounts WHERE filler IS NULL ) d GROUP BY d.part; ]]> </sql>
</changeSet> <changeSet author="soldatov" id="JIRA-002-02"> <executeCommand os="Linux, Mac OS X" executable="./pgpar.sh"> <arg value="pgconfdb"/><arg value="JIRA-002"/> </executeCommand>
</changeSet>

Параллельные changeSet’ы в Liquibase

<changeSet author="soldatov" id="JIRA-003-01"> <sql> <![CDATA[ INSERT INTO pg_parallel_task (name, threads_count) VALUES ('JIRA-003', 2); INSERT INTO pg_parallel_task_statements (task_name, sql_statement) SELECT 'JIRA-003' task_name, 'VACUUM FULL ANALYZE pgbench_accounts;' sql_statement; INSERT INTO pg_parallel_task_statements (task_name, sql_statement) SELECT 'JIRA-003' task_name, 'VACUUM FULL ANALYZE pgbench_branches;' sql_statement; ]]> </sql>
</changeSet> <changeSet author="soldatov" id="JIRA-003-02"> <executeCommand os="Linux, Mac OS X" executable="./pgpar.sh"> <arg value="testdatabase"/><arg value="JIRA-003"/> </executeCommand>
</changeSet>

Почти неблокирующее заполнение нового поля таблицы данными (50 000 диапазонов по 10 000 строк каждый) с вызовом «многопоточки» функцией из БД

-- SQL part
ALTER TABLE pgbench_accounts ADD COLUMN account_number text; INSERT INTO public.pg_parallel_task (name, threads_count) VALUES ('JIRA-004', 5); INSERT INTO public.pg_parallel_task_statements (task_name, sql_statement)
SELECT 'JIRA-004' task_name, FORMAT('UPDATE pgbench_accounts SET account_number = aid::text || filler WHERE aid >= ''%s'' AND aid <= ''%s'';', MIN(d.aid), MAX(d.aid)) sql_statement
FROM (SELECT aid, ntile(50000) over (order by device_version_guid) part FROM pgbench_accounts) d
GROUP BY d.part; SELECT * FROM func_run_parallel_task('testdatabase','JIRA-004');


x

Ещё Hi-Tech Интересное!

[Перевод] Python Developer Tools от Microsoft. Начало работы

Последние несколько лет специалисты Microsoft трудились над тем, чтобы добавить поддержку инструментов разработчика Python в одни из наших самых популярных продуктов: Visual Studio Code и Visual Studio. В этом году все заработало. В статье мы познакомимся с инструментами разработчика Python ...

[Перевод] Вселенная, соответствующая нашим текущим представлениям, может оказаться невозможной

Новая физическая гипотеза бросает вызов лидирующей «теории всего» Один заголовок поразил его так, что он сбросил все остатки сна. 25 июня физик Тимм Вразе [Timm Wrase], живущий в Вене, проснулся, и сонно листал в онлайне список недавно опубликованных физических работ. ...