Хабрахабр

Apache Ignite + Apache Spark Data Frames: вместе веселее

Привет, Хабр! Меня зовут Николай Ижиков, я работаю в компании «Сбербанк Технологии» в команде развития Open Source решений. За плечами 15 лет коммерческой разработки на Java. Я коммитер Apache Ignite и контрибьютор Apache Kafka.

Под катом вас ожидает видео и текстовая версия моего доклада на Apache Ignite Meetup о том, как использовать Apache Ignite вместе с Apache Spark и какие возможности мы для этого реализовали.

Что умеет Apache Spark

Что такое Apache Spark? Это продукт, который позволяет быстро выполнять распределенные вычисления и аналитические запросы. В основном, Apache Spark написан на Scala.

Одна из особенностей продукта — универсальный SQL-like движок запросов к данным, получаемым из различных источников. У Apache Spark богатый API для подключения к различным системам хранения или получения данных. Если у вас несколько источников информации, вы хотите их объединить и получить какие-то результаты, Apache Spark — это то, что вам нужно.

В терминах реляционной базы — это таблица, некий источник, который предоставляет данные в структурированном виде. Одной из ключевых абстракций, которую предоставляет Spark, являются Data Frame, DataSet. Data Frame'ы могут быть созданы из различных источников. Известна структура, тип каждого столбца, его название и т.п. В качестве примеров можно привести json-файлы, реляционные базы данных, различные hadoop-системы, а также Apache Ignite.

Можно объединять данные из различных источников и получать результаты, выполнять аналитические запросы. Spark поддерживает join'ы в SQL-запросах. Когда вы выполнили запросы, провели исследование, то Spark предоставляет возможность сохранить результаты в тот приёмник, который поддерживает такую возможность, и, соответственно, решить задачу по обработке данных. Кроме того, есть API для сохранения данных.

Какие возможности интеграции Apache Spark с Apache Ignite мы реализовали

  1. Чтение данных из SQL-таблиц Apache Ignite.
  2. Запись данных в SQL-таблицы Apache Ignite.
  3. IgniteCatalog внутри IgniteSparkSession — возможность использовать все существующие SQL-таблицы Ignite'а без регистрации «руками».
  4. SQL Optimization — возможность выполнять SQL-операторы внутри Ignite.

Apache Spark умеет читать данные из Apache Ignite SQL-таблиц и записывать их в виде такой таблицы. Любой DataFrame, который сформирован в Spark, можно сохранить в виде SQL-таблицы Apache Ignite.

Apache Ignite позволяет использовать все существующие SQL-таблицы Ignite в Spark Session без регистрации «руками» — с помощью IgniteCatalog внутри расширения стандартной SparkSession — IgniteSparkSession.

В терминах обычной базы данных каталог — это место, в котором хранится мета-информация: какие таблицы доступны, какие в них столбцы и т.п. Тут надо немного углубиться в устройство Spark. По умолчанию в Spark все прочитанные таблицы (не важно, из реляционной базы данных, Ignite, Hadoop) приходится вручную регистрировать в сессии. Когда поступает запрос, из каталога подтягивается мета-информация и SQL-движок что-то делает с таблицами, данными. Spark про них узнает. В результате вы получаете возможность сделать SQL-запрос над этими таблицами.

Но вместо регистрации каждой таблицы «руками» мы реализовали возможность автоматически получать доступ ко всем таблицам Ignite. Чтобы  работать с данными, которые мы загрузили в Ignite, нам нужно зарегистрировать таблицы.

По непонятной мне причине, каталог в Spark – это internal API, т.е. В чем здесь особенность? И, поскольку Spark вышел из Hadoop, он поддерживает только Hive. сторонний человек не может прийти и создать свою имплементацию каталога. Пользователи часто спрашивают, как можно это обойти и сразу делать SQL-запросы. А все остальное вы должны регистрировать руками. И пробросить наружу internal API они тоже не дали. Я реализовал каталог, который позволяет обозревать и обращаться к таблицам Ignite без регистрации ~и sms~, и первоначально предложил этот патч в Spark community, на что получил ответ: такой патч не интересен по каким-то внутренним причинам.

Чтобы использовать этот каталог, у нас есть своя имплементация сессии, Это обычная SparkSession, внутри которой можно делать запросы, обрабатывать данные. Сейчас Ignite-каталог — это интересная фича, реализованная с использованием внутреннего API Spark'а. Отличия состоят в том, что мы встроили в неё ExternalCatalog для работы с таблицами Ignite, а также IgniteOptimization, о котором будет рассказано ниже.

По умолчанию при выполнении join, группировки, расчёта агрегатов, других сложных SQL-запросов Spark читает данные в режиме row by row. SQL Optimization — возможность выполнять SQL-операторы внутри Ignite. Единственное, что «умеет» источник данных, это эффективно отфильтровать строки.

В случае Ignite это неоптимально, потому что сам Ignite имеет распределенную архитектуру и обладает знаниями о данных, которые в нём хранятся. Если используется join или группировка, Spark вытаскивает все данные из таблицы к себе в память на worker, применяя заданные фильтры, и только потом группирует их или выполняет другие SQL-операции. Кроме того, данных может быть много, и для их группировки нужно будет вычитать всё, поднять все данные в Spark, что довольно затратно. Поэтому сам Ignite может эффективно подсчитать агрегаты, провести группировку.

Это будет эффективно с точки зрения скорости, а также расхода памяти, потому что мы не будем использовать ее, чтобы вытянуть данные, которые будут тут же сгруппированы. В Spark предусмотрен API, при помощи которого можно изменять первоначальный план SQL запроса, выполнить оптимизацию и пробросить внутрь Ignite ту часть SQL-запроса, которая может быть там выполнена.

Как все работает

Zookeeper нет, поскольку нод всего пять. У нас есть кластер Ignite — это нижняя половина картинки. Через нее мы можем сделать запрос и прочитать данные, взаимодействовать с кластером. Есть спарковские worker'ы, внутри каждого worker'а поднимается клиентская нода Ignite. Также клиентская нода поднимается внутри IgniteSparkSession для работы каталога.

Ignite Data Frame

Переходим к коду: как прочитать данные из SQL-таблицы? В случае Spark все достаточно просто и хорошо: говорим, что хотим посчитать какие-то данные, указываем формат – это определенная константа. Дальше у нас есть несколько опций — путь к конфигурационному файлу для клиентской ноды, которая запускается при чтении данных. Мы указываем, какую таблицу хотим прочитать, и говорим Spark загрузить. Получаем данные и можем с ними делать, что хотим.

spark.read .format(FORMAT_IGNITE) .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) .option(OPTION_TABLE, "person") .load()

После того, как мы сформировали данные — необязательно из Ignite, можно из любого источника – мы можем так же просто все сохранить, указав формат и соответствующую таблицу. Командуем Spark записать, указываем формат. В конфиге прописываем, к какому кластеру коннектиться. Указываем таблицу, в которую хотим сохранить. Дополнительно можем прописать служебные опции — указать primary key, который мы на этой таблице создаем. Если данные просто апендятся без создания таблицы, то этот параметр не нужен. В конце жмем save и данные записываются.

tbl.write. format(FORMAT_IGNITE). option(OPTION_CONFIG_FILE, CFG_PATH). option(OPTION_TABLE, tableName). option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, pk). save

Теперь давайте посмотрим, как это все работает.


LoadDataExample.scala

Я выбрал для примера данные по футбольным матчам, скачал статистику с известного ресурса. Это очевидное приложение сначала продемонстрирует возможности записи. Тут содержится информация по турнирам: лиги, матчи, игроки, команды, атрибуты игроков, атрибуты команд — данные, которые описывают футбольные матчи в лигах европейских стран (Англия, Франция, Испании и т.п.).

Мы создаем сессию Spark, указываем адрес мастера и вызываем загрузку этих таблиц, передавая параметры. Я хочу загрузить их в Ignite. Пример на Scala, а не на Java, потому что Scala менее многословна и так лучше для примера.

Потом записываем в Ignite. Передаем имя файла, читаем его, указываем, что он мультилайн, это стандартный json-файл. Если все проходит гладко, создается таблица, в которой есть все нужные поля нужных типов данных. Структуру нашего файла нигде не описываем — Spark сам определяет, какие данные у нас лежат и какая у них структура. Вот так мы можем загрузить все внутрь Ignite.

В качестве простого примера — запрос, который позволяет узнать, какая команда сыграла больше всех матчей. Когда данные загрузятся, мы их сможем увидеть в Ignite и сразу использовать. Выбираем, группируем, считаем count, суммируем и джойним с данными по команде – чтобы ввести имя команды. У нас есть две колонки: hometeam и awayteam, хозяева и гости. Видим Пари Сен-Жермен, Тулузу — по французским командам у нас оказалось много данных. Та-дам – и данные с json-чиков у нас попали в Ignite.

Мы сейчас загрузили данные из источника, json-файла, в Ignite, причем достаточно быстро. Резюмируем. Схема таблицы взята из json-файла в исходном виде. Возможно, с точки зрения big data это не слишком большой объем, но для локального компьютера прилично. ID везде есть, и первичный ключ является ID. Таблица создалась, названия столбцов скопировались из исходного файла, создан первичный ключ. Эти данные попали в Ignite, мы можем их использовать.

IgniteSparkSession и IgniteCatalog

Посмотрим, как это работает.


CatalogExample.scala

В прошлом примере мы запускали стандартную спарковскую сессию. Достаточно простым способом вы можете получить доступ и делать запросы ко всем вашим данным.  Но, если вы хотите получить доступ к Ignite-таблицам автоматически, можно использовать наше расширение. И никакой специфики Ignite там не было — кроме того, что вы должны подложить jar с нужным источником данных — совершенно стандартная работа через public API. Отличие состоит в том, что вместо SparkSession мы пишем IgniteSparkSession.

Можете посмотреть их схему и всю информацию. Как только вы создаете обьект IgniteSparkSession, то видите в каталоге все таблицы, которые только что загрузили в Ignite. Spark уже знает про таблицы, которые есть в Ignite, и вы можете легко получить все данные.

IgniteOptimization

Когда вы делаете сложные запросы в Ignite с использованием JOIN, сначала Spark вытаскивает данные, и только потом JOIN их группирует. Чтобы оптимизировать процесс, мы сделали фичу IgniteOptimization — она оптимизирует план запросов Spark и позволяет пробросить внутрь Ignite те части запроса, которые могут быть выполнены внутри Ignite. Покажем оптимизацию на конкретном запросе.

SQL Query: SELECT city_id, count(*) FROM person p GROUP BY city_id HAVING count(*) > 1

Выполняем запрос. У нас есть таблица person — какие-то сотрудники, люди. У каждого сотрудника известен ID города, в котором он живет. Мы хотим узнать, сколько человек живет в каждом городе. Фильтруем – в каком городе живет больше одного человека. Вот изначальный план, который строит Spark:

== Analyzed Logical Plan ==
city_id: bigint, count(1): bigint
Project [city_id#19L, count(1)#52L]
+- Filter (count(1)#54L > cast(1 as bigint)) +- Aggregate [city_id#19L], [city_id#19L, count(1) AS count(1)#52L, count(1) AS count(1)#54L] +- SubqueryAlias p +- SubqueryAlias person +- Relation[NAME#11,BIRTH_DATE#12,IS_RESIDENT#13,SALARY#14,PENSION#15,ACCOUNT#16,AGE#17,ID#18L,CITY_ID#19L] IgniteSQLRelation[table=PERSON]

Relation – это как раз Ignite-таблица. Нет никаких фильтров — мы просто выкачиваем по сети из кластера все данные из таблицы Person. Потом Spark все это агрегирует — в соответствии с запросом и вернет результат запроса.

Это будет гораздо эффективней, чем вытягивать все данные из потенциально большой таблицы в Spark — этим и занимается наша фича IgniteOptimization. Легко видеть, что все это поддерево с фильтром и агрегацией может быть исполнено внутри Ignite. После анализа и оптимизации дерева мы получаем следующий план:

== Optimized Logical Plan ==
Relation[CITY_ID#19L,COUNT(1)#52L] IgniteSQLAccumulatorRelation( columns=[CITY_ID, COUNT(1)], qry=SELECT CITY_ID, COUNT(1) FROM PERSON GROUP BY city_id HAVING count(1) > 1)

В итоге, у нас получается всего один relation, так как мы оптимизировали все дерево. И внутри уже видно, что на Ignite пойдет запрос, который достаточно близок к изначальному запросу.

В этом случае в плане будет оптимизировано только поддерево. Предположим, мы джойнимся с разными источниками данных: например, один DataFrame у нас из Ignite, второй из json, третий опять из Ignite, а четвертый — из какой-то  реляционной базы. За счет этого мы получаем выигрыш по скорости. Мы оптимизируем, что можем, закидываем это в Ignite, а все остальное уже будет делать Spark.

Другой пример с JOIN:

SQL Query - SELECT jt1.id as id1, jt1.val1, jt2.id as id2, jt2.val2
FROM jt1 JOIN jt2 ON jt1.val1 = jt2.val2

У нас есть две таблицы. Мы джойнимся по значению и выбираем из них все — ID, значения. Spark предлагает вот такой вот план:

== Analyzed Logical Plan ==
id1: bigint, val1: string, id2: bigint, val2: string
Project [id#4L AS id1#84L, val1#3, id#6L AS id2#85L, val2#5]
+- Join Inner, (val1#3 = val2#5) :- SubqueryAlias jt1 : +- Relation[VAL1#3,ID#4L] IgniteSQLRelation[table=JT1] +- SubqueryAlias jt2 +- Relation[VAL2#5,ID#6L] IgniteSQLRelation[table=JT2]

Видим, что он будет вытаскивать все данные из одной таблицы, все данные из второй, джойнить их внутри себя и выдавать результаты. После обработки и оптимизации мы получаем ровно такой же запрос, который уходит в Ignite, где сравнительно быстро исполняется.

== Optimized Logical Plan ==
Relation[ID#84L,VAL1#3,ID#85L,VAL2#5]
IgniteSQLAccumulatorRelation(columns=[ID, VAL1, ID, VAL2],
qry= SELECT JT1.ID AS id1, JT1.VAL1, JT2.ID AS id2, JT2.VAL2 FROM JT1 JOIN JT2 ON JT1.val1 = JT2.val2 WHERE JT1.val1 IS NOT NULL AND JT2.val2 IS NOT NULL)

Покажу еще пример.


OptimizationExample.scala

Здесь запрос такой: найти игроков с наибольшим рейтингом и вывести их имена. Мы создаем IgniteSpark-сессию, в которую уже автоматически включены все наши возможности оптимизации. Мы джойнимся, фильтруем мусорные данные и выводим игроков с наибольшим рейтингом. В таблице player — их атрибуты и данные. Посмотрим, какой план у нас получился после оптимизации, и покажем результаты этого запроса.

Видим знакомые фамилии: Месси, Буффон, Роналду и т.д. Запускаем. Любителям футбола может показаться странным, что в списке фигурируют неизвестные игроки. Кстати, некоторые почему-то в двух ипостасях встречаются — и Месси, и Роналду. Теперь смотрим на план запроса, который был выполнен. Это вратари, игроки с довольно высокими характеристиками — на фоне других игроков. В Spark почти ничего не выполнялось, то есть мы весь запрос отправили опять же в Ignite.

Развитие Apache Ignite

Наш проект – это open source-продукт, поэтому мы всегда рады патчам и обратной связи от разработчиков. Ваша помощь, обратная связь, патчи очень приветствуются. Мы ждем их. На 90% Ignite-комьюнити – русскоязычно. Например, для меня, пока я не начал рабоать над Apache Ignite, не самое лучшее знание английского языка было сдерживающим фактором. По-русски вряд ли стоит на dev-лист писать, но даже если вы напишите что-то не так, вам ответят и помогут.

Чем можно помочь, если у вас есть такое желание? Что можно улучшить по этой интеграции? Звездочки обозначают сложность. Список ниже.

Выше я показал несколько очевидных запросов.
Для проверки оптимизации нужно писать тесты со сложными запросами.  Это совсем простой таск — приходите и делайте. Понятно, что если написать много группировок и много джойнов, то что-то может упасть. Там уже будет сложнее. Если мы найдем какие-то баги по итогам тестов, их надо будет исправить.

Он изначально способен указать какие-то наборы IP-адресов, и этого достаточно, чтобы присоединиться к Ignite-кластеру, что удобно в случае интеграции с внешней системой. Еще одна понятная и интересная задача – интеграция Spark с тонким клиентом. Если вдруг захотите присоединиться к решению этой задачи, я лично буду по ней помогать.

Если вы захотели присоединиться к сообществу Apache Ignite, ниже полезные ссылки:

У нас отзывчивый дев-лист, на котором вам обязательно помогут. Он еще далек от идеала, но в сравнении с другими проектами действительно живой.

Если вы знаете Java или С++, ищете работу и хотите разрабатывать Open Source (Apache Ignite, Apache Kafka, Tarantool и т.п.) пишите сюда: join-open-source@sberbank.ru.

Показать больше

Похожие публикации

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»