Главная » Хабрахабр » Продуктовая аналитика ВКонтакте на базе ClickHouse

Продуктовая аналитика ВКонтакте на базе ClickHouse

Понимать, делаем мы своими изменениями лучше или хуже, корректировать направление развития продукта, опираясь не на интуицию и собственные ощущения, а на метрики и цифры, в которые можно верить. Развивая любой продукт, будь то видеосервис или лента, истории или статьи, хочется уметь измерять условное «счастье» пользователя.

Речь пойдёт о ClickHouse, используемых движках и особенностях запросов. В этой статье я расскажу, как нам удалось запустить продуктовую статистику и аналитику на сервисе с 97-миллионной месячной аудиторией, получив при этом чрезвычайно высокую производительность аналитических запросов. Я опишу подход к агрегации данных, который позволяет нам за доли секунды получать сложные метрики, и расскажу о преобразовании и тестировании данных.

А дальше — не такими быстрыми темпами поднимемся до 40–50 миллиардов к концу года, когда опишем все интересующие нас продуктовые события. Сейчас у нас около 6 миллиардов продуктовых событий в сутки, в ближайшее время дойдём до 20–25 миллиардов.

Elapsed: 0. 1 rows in set. Processed 59. 287 sec. 85 GB (208. 85 billion rows, 59. 16 GB/s.) 16 billion rows/s., 208.

Подробности под катом.

Аналитические инструменты были ВКонтакте и раньше. Уникальные пользователи считались, можно было строить графики событий по срезам и тем самым проваливаться вглубь сервиса. Однако речь шла о фиксированных заранее срезах, об агрегированных данных, об HLL для уников, о некоторой скованности и невозможности отвечать быстро на вопросы чуть более сложные, чем «сколько?».

К сожалению, hdfs применялся лишь некоторыми командами для реализации собственных задач. Конечно, был, есть и будет hadoop, в него тоже писалось, пишется и будет писаться много, очень много логов использований сервисов. Ещё более печально, что hdfs — не про быстрые аналитические запросы: ко многим полям были вопросы, ответы на которые приходилось искать в коде, а не в доступной всем документации.

Данными должна обладать каждая команда, запросы над ними должны быть быстрыми, а сами данные — точными и богатыми на полезные параметры. Мы пришли к выводу, что так жить больше нельзя.

Поэтому мы сформулировали понятные требования к новой системе статистики/аналитики:

  • аналитические запросы должны быть быстрыми;
  • данные — достаточно точными, в идеале это сырые события пользовательского взаимодействия с сервисом;
  • структура событий должна быть описана, понятна и доступна;
  • обеспечивается надёжность хранения данных, гарантия однократной доставки;
  • есть возможность считать уников, аудиторию (дневную, недельную, месячную), метрики удержания, проведённое пользователем в сервисе время, квантили действий на уника и другие метрики по множеству срезов;
  • выполняется тестирование, преобразование данных и визуализация.

Опыт подсказывал, что мы нуждаемся в двух базах: медленной, где мы бы агрегировали и обогащали данные, и быстрой, где с этими данными можно было бы работать и поверх которой строить графики. Это один из самых распространённых подходов, при котором в медленной базе, например, в hdfs, строятся разные проекции — на уников и на количество событий по срезам на определённый промежуток в времени.

Было много сомнений, связанных в первую очередь со скоростью и надёжностью: заявленные тесты производительности казались нереальными, а новые релизы базы периодически ломали имеющуюся функциональность. Тёплым сентябрьским днём, при разговоре за чашкой чая на кухне с видом на Казанский собор, у нас возникла идея попробовать ClickHouse в качестве быстрой базы — на тот момент мы уже использовали его для хранения технических логов. Потому предложение было простым — пробовать.

Мы развернули кластер из двух машин с такой конфигурацией:
2xE5-2620 v4 (32 ядра в сумме), 256G ram, 28T места (raid10 с ext4).

В ClickHouse есть множество различных движков таблиц, но основные — из семейства MergeTree. Изначально был near layout, но затем мы перешли на far. Выбрали ReplicatedReplacingMergeTree с примерно такими настройками:

PARTITION BY dt ORDER BY (toStartOfHour(time), cityHash64(user_id), event_microsec, event_id)
SAMPLE BY cityHash64(user_id)
SETTINGS index_granularity = 8192;

Replicated — означает, что таблица реплицируемая, и это решает одно из наших требований к надёжности.

Replacing — таблица поддерживает дедупликацию по первичному ключу: по умолчанию первичный ключ совпадает с ключом сортировки, так что секция ORDER BY как раз и говорит о том, что такое первичный ключ.

SAMPLE BY — семплирование тоже хотелось попробовать: sample возвращает равномерно-псевдослучайную выборку.

Мы его не меняли. index_granularity = 8192 — это магическое число строк данных между засечками индекса (да, он разреженный), которое используется по умолчанию.

Множество запросов к данным, как предполагалось, будут внутридневными — например, построить минутный график просмотров видео за определённый день. Партиционирование сделали по дням (хотя по умолчанию — по месяцам).

Прекрасное сжатие, группировки по колонкам типа Int*, подсчёты уникальных значений — всё работало невероятно быстро! Далее мы взяли кусок технических логов и наполнили таблицу примерно миллиардом строк.

И это на двух машинах — причём подсчётами, по сути, занималась только одна. Говоря о скорости, имею в виду, что ни один запрос не длился дольше 500 мс, а большинство из них укладывались в 50–100 мс.

И поняли, что ClickHouse нам полностью подходит, если сделать всё правильно. Мы посмотрели на это всё и представили, что вместо колонки UInt8 будет id страны, а колонку Int8 заменят данные, например, о возрасте пользователя.

Польза от ClickHouse начинается ровно тогда, когда сформирована правильная схема данных. Пример: platform String — плохо, platform Int8 + словарь — хорошо, LowCardinality(String) — удобно и хорошо (о LowCardinality я расскажу чуть позже).

Поясню на примере схемы, которая получилась: Мы создали специальный класс-генератор в php, который по запросу создаёт классы-обёртки над событиями на основании таблиц в ClickHouse, и единую точку входа в логирование.

  1. Аналитик/дата-инженер/разработчик описывает документацию: какие поля, возможные значения, события необходимо логировать.
  2. Cоздаётся таблица в ClickHouse в соответствии со структурой данных из предыдущего пункта.
  3. Генерятся классы-обёртки для событий на основании таблицы.
  4. Продуктовой командой реализуется заполнение полей объекта этого класса, отправка.

Изменить схему на уровне php и тип логируемых данных не получится, не изменив прежде таблицу в ClickHouse. А это в свою очередь нельзя сделать без согласования с командой, изменения документации и описания событий.

Настройки нужны в первую очередь для постепенной раскатки с возможностью вырубить логирование, если что-то пойдёт не так. Для каждого события можно выставить две настройки, которые регулируют процент отправляемых событий в ClickHouse и hadoop соответственно. А в ClickHouse они летят через схему с KittenHouse в persistent-режиме, гарантирующем хотя бы однократную доставку события. До hadoop данные доставляются по стандартной схеме с использованием Kafka.

Далее буферная таблица сбрасывает данные в локальную ReplicatedReplacingMergeTree. Событие доставляется в буферную таблицу на нужный шард, исходя из остатка от деления некоторого hash от user_id на количество шардов в кластере. А поверх локальных таблиц натянута распределённая таблица с движком Distributed, которая позволяет обращаться к данным со всех шардов.

ClickHouse — столбцовая СУБД. Она не про нормальные формы, а значит лучше иметь всю информацию прямо в событии, чем join-ить. Join-ы тоже есть, но если правая таблица не помещается в память, начинается боль. Поэтому мы приняли волевое решение: вся интересующая нас информация должна храниться в самом событии. Например, пол, возраст пользователя, страна, город, день рождения — всё то, что является публичной информацией, которая может пригодиться для аналитики аудитории, а также все полезные сведения об объекте взаимодействия. Если, например, речь идёт о видео — это video_id, video_owner_id, дата загрузки видео, длина, качество в момент события, максимальное качество и так далее.

Например, лог ошибки error_log — на самом деле ошибкой мы называем выход за диапазоны типа. В общей сложности в каждой таблице у нас от 50 до 200 колонок, при этом во всех таблицах есть сервисные поля. На случай, если в поле с возрастом польются странные значения, выходящие за размеры типа.

Тип LowCardinality(T)

В ClickHouse есть возможность использовать внешние словари. Они хранятся в памяти, периодически обновляются, могут эффективно применяться в различных сценариях, в том числе как классические справочники. Например, вы хотите логировать операционную систему и у вас есть две альтернативы: строка или число + справочник. Само собой, на больших объёмах данных, да и для высокой производительности аналитических запросов, логично писать число, а строковое представление получать из словаря, когда понадобится:

dictGetString('os', 'os_name', toUInt64(os_id))

Но есть намного более удобный способ — использовать тип LowCardinality(String), который автоматически строит словарь. Производительность работы с LowCardinality при условии низкой кардинальности множества значений радикально выше, чем со String.

Или для platform: 'web', 'android', 'iphone': Мы, например, используем LowCardinality(String) для типов событий 'play', 'pause', 'rewind'.

SELECT vk_platform, count()
FROM t
WHERE dt = yesterday()
GROUP BY vk_platform
Elapsed: 0.145 sec. Processed 1.98 billion rows, 5.96 GB (13.65 billion rows/s., 41.04 GB/s.)

Фича пока экспериментальная, так что для её использования необходимо выполнять:

SET allow_experimental_low_cardinality_type = 1;

Но есть ощущение, что через какое-то время она окажется уже не под настройкой.
Так как колонок много, а событий очень много, то естественное желание — подрезать «старые» партиции, но прежде — собирать агрегаты. Изредка бывает нужно проанализировать сырые события (месячной или годичной давности), поэтому мы не подрезаем данные в hdfs — любой аналитик может обратиться к нужному паркету за любую дату.

Это накладывает ограничения: страны начинают собирать в группы типа 'Россия', 'Азия', 'Европа', 'Остальной мир', а возрасты — в интервалы, чтобы понизить размерность до условного миллиона строк на дату. Как правило, при агрегации во временной интервал мы всегда упираемся в то, что количество строк на единицу времени равно произведению мощностей срезов.

Агрегация по dt, user_id

Но ведь у нас реактивный ClickHouse! Можем разогнаться до 50−100 млн строк на дату?
Быстрые тесты показали, что можем, и в этот момент возникла простая идея — оставить пользователя в агрегате. А именно — агрегировать не по «дата, срезы» средствами spark, а по «дата, пользователь» средствами ClickHouse, производя при этом некоторое «транспонирование» данных.

Можем соединять агрегаты, считая общие аудитории нескольких сервисов вплоть до всей аудитории ВКонтакте. При таком подходе мы сохраняем пользователей в агрегированных данных, а значит по-прежнему можем считать аудиторные показатели, метрики удержания и частотности. Всё это можно делать по любому срезу, который присутствует в таблице за условно то же время.

Проиллюстрирую на примере:

После агрегации (ещё много колонок справа):

Для полей с информацией по пользователю при такой агрегации можно использовать функции any, anyHeavy (выбирает часто встречающееся значение). При этом агрегация происходит именно по (dt, user_id). При желании можно использовать groupUniqArray(platform) и хранить массив всех платформ, с которых пользователь вызывал событие. Можно, например, собрать в агрегат anyHeavy(platform), чтобы знать, с какой платформы в основном пользователь вызывает видеособытия. Если и этого мало, можно завести отдельные колонки под платформы и хранить, например, количество уникальных видео, досмотренных до половины с конкретной платформы:

uniqCombinedIf(cityHash64(video_owner_id, video_id), (platform = 'android') AND (event = '50p')) as uniq_videos_50p_android

При таком подходе получается достаточно широкий агрегат, в котором каждая строка является уникальным пользователем, а каждая колонка содержит информацию или по пользователю, или по его взаимодействию с сервисом.

Получается, чтобы посчитать DAU сервиса, достаточно выполнить такой запрос поверх его агрегата:

SELECT dt, count() as DAU
FROM agg
GROUP BY dt
Elapsed: 0.078 sec.

Или вычислить, сколько дней пользователи присутствовали в сервисе за неделю:

SELECT days_in_service, count() AS uniques
FROM
( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 WHERE dt > (yesterday() - 7) GROUP BY user_id
)
GROUP BY days_in_service
ORDER BY days_in_service ASC
7 rows in set. Elapsed: 2.922 sec.

Можем ускорить семплированием, при этом почти не теряя в точности:

SELECT days_in_service, 10 * count() AS uniques
FROM
( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 10 WHERE dt > (yesterday() - 7) GROUP BY user_id
)
GROUP BY days_in_service
ORDER BY days_in_service ASC
7 rows in set. Elapsed: 0.454 sec.

Стоит сразу отметить, что семплирование идёт не по проценту событий, а по проценту пользователей — и в результате становится невероятно мощным инструментом.

Или то же за 4 недели с семплированием 1/100 — примерно на 1% менее точные результаты получаются.

SELECT days_in_service, 100 * count() AS uniques
FROM
( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 100 WHERE dt > (yesterday() - 28) GROUP BY user_id
)
GROUP BY days_in_service
ORDER BY days_in_service ASC
28 rows in set. Elapsed: 0.287 sec.

Агрегация с другой стороны

При агрегации по (dt, user_id) мы не теряем пользователя, не сильно упускаем информацию о его взаимодействии с сервисом, однако, несомненно, утрачиваем метрики о конкретном объекте взаимодействия. Но ведь можно и это не терять — давайте построим агрегат по
(dt, video_owner_id, video_id), придерживаясь тех же идей. Максимально сохраняем информацию о видео, не сильно упускаем данные о взаимодействии видео с пользователем и совсем пропускаем информацию о конкретном пользователе.

SELECT starts
FROM agg3
WHERE (dt = yesterday()) AND (video_id = ...) AND (video_owner_id = ...)
1 rows in set. Elapsed: 0.030 sec

Или топ-10 видео по просмотрам за вчера:

SELECT video_id, video_owner_id, watches
FROM video_agg_video_d1
WHERE dt = yesterday()
ORDER BY watches DESC
LIMIT 10
10 rows in set. Elapsed: 0.035 sec.

В итоге у нас есть схема агрегатов вида:

  • агрегация по «дата, пользователь» в рамках продукта;
  • агрегация по «дата, объект взаимодействия» в рамках продукта;
  • иногда возникают и другие проекции.

Стоит напоследок сказать пару слов про инфраструктуру. Сбор агрегатов у нас запускается ночью, стартуя с OPTIMIZE над каждой из таблиц с сырыми данными, чтобы вызвать внеочередное слияние данных в ReplicatedReplacingMergeTree. Операция может длиться достаточно долго, однако она необходима для удаления дублей, если они возникли. Тут стоит отметить, что пока я ещё ни разу не сталкивался с дубликатами, но нет гарантий, что они не появятся в будущем.

Это bash-скрипты, в которых происходит следующее: Следующий шаг — создание агрегатов.

  • сначала мы получаем количество шардов и какой-нибудь хост из шарда:

    SELECT shard_num, any(host_name) AS host
    FROM system.clusters
    GROUP BY shard_num

  • затем скрипт выполняет последовательно для каждого шарда (clickhouse-client -h $host) запрос вида (для агрегатов по пользователям):

    INSERT INTO ... SELECT ... FROM ... SAMPLE 1/$shards_count OFFSET 1/$shard_num

Это не совсем оптимально и может порождать много сетевого взаимодействия между хостами. Однако при добавлении новых шардов всё продолжает работать из коробки, сохраняется локальность данных для агрегатов, поэтому мы решили не сильно из-за этого переживать.

Я бы не сказал, что это суперудобный инструмент, но со своей задачей прекрасно справляется, в том числе когда речь идёт о выстраивании чуть более сложных pipeline-ов и когда одному скрипту нужно дождаться выполнения нескольких других. В качестве планировщика заданий у нас выступает Azkaban.

Суммарное время, которое уходит на преобразование существующих сейчас событий в агрегаты, — 15 минут.

Тестирование

Каждое утро у нас запускаются автоматизированные тесты, которые отвечают на вопросы относительно сырых данных, а также готовности и качества агрегатов: «Проверь, что за вчера данных или уников по какому-нибудь срезу в сырых данных или в агрегатах не стало меньше более чем на полпроцента по сравнению с тем же днём неделю назад».

Прогон всех тестов запускается в TeamCity и выполняется около 30 секунд в 1 поток, а в случае fail-ов мы получаем уведомления ВКонтакте от нашего замечательного TeamCity-бота. Технологически это обычные unit-тесты с использованием JUnit и реализации jdbc-драйвера к ClickHouse.

Используйте только стабильные версии ClickHouse, и ваши волосы будут мягкими и шелковистыми. Стоит ещё добавить, что ClickHouse не тормозит.


Оставить комментарий

Ваш email нигде не будет показан
Обязательные для заполнения поля помечены *

*

x

Ещё Hi-Tech Интересное!

Когда шифрование не поможет: рассказываем про физический доступ к устройству

В феврале мы опубликовали статью «Не VPN-ом единым. Шпаргалка о том, как обезопасить себя и свои данные». Один из комментариев побудил нас написать продолжение статьи. Эта часть — вполне автономный источник информации, но всё же рекомендуем ознакомиться с обоими постами. ...

[Из песочницы] Buildroot — часть 1. Общие сведения, сборка минимальной системы, настройка через меню

Введение Здесь будет практический опыт создания небольшой ОС с графическим интерфейсом и минимальным функционалом. В данной серии статей я хочу рассмотреть систему сборки дистрибутива buildroot и поделиться опытом её кастомизации. Buildroot может собрать систему из набора пакетов, которые ему предложили. ...