Хабрахабр

Apache Spark — достоинства, недостатки, пожелания

Мне давно хотелось изложить свои впечатления об Apache Spark, и тут как раз попалась на глаза вот эта статья от сотрудника Pivotal Robert Bennett, опубликованная совсем недавно, 26 июня 2018.

Это не будет перевод, а скорее все-таки мои впечатления и комментарии на тему.

Что делает Spark популярным?

Цитата:

It does in-memory, distributed and iterative computation, which is particularly useful when working with machine learning algorithms. It’s easy to see why Apache Spark is so popular. Other tools might require writing intermediate results to disk and reading them back into memory, which can make using iterative algorithms painfully slow.

Начнем с того, что это все по большей части не совсем правда. In memory? Ну да, Spark будет стараться, но то, что написано тут про other tools, точно также будет иметь место. В конце концов, память, ядра процессора и сеть — ресурсы ограниченные, так что рано или поздно любой инструмент упирается в их пределы.

Так или иначе данные все равно должны либо оказаться на диске (кроме всего прочего, это позволит спокойнее пережить ошибки, и не начинать вычисления с самого начала), либо переданы по сети (shuffle и другие процессы). В некотором смысле Spark ни разу не более in-memory, чем любой классический map-reduce. Захотите ли вы их сохранить в память, если данных скажем терабайт? Я уже не говорю, что вам как программисту мало что помешает выполнить persist, и сохранить промежуточные результаты на диск, если вы вдруг захотите. Сомневаюсь.

А уж окончательное быстродействие, в конечном счете, скорее зависит от прямизны рук того, кто пишет программу. Я бы скорее сказал, что в отличие от других инструментов (под которыми обычно понимается классический map-reduce), Spark позволяет чуть меньше думать об оптимальном использовании ресурсов, и больше оптимизирует это использование сам.

Далее автор перечисляет такие качества Spark, которые кажутся ему наилучшими:

Привлекательный API и ленивое выполнение (Appealing APIs and Lazy Execution)

В целом я с этим согласен. Spark как средство разработки намного удобнее классического map-reduce, и несколько удобнее инструментов типа Apache Crunch и других инструментов из условного «второго» поколения. А также несколько более гибкий, чем например Hive, и не ограничен SQL как таковым.

Иногда было бы лучше, если бы скажем различия в схемах Hive и DataSet диагностировались не тогда, когда все данные уже были обработаны, а чуть раньше, и падало все не через пару часов/сутки, а при запуске. Ленивое же исполнение — это не всегда хорошо.

Простота преобразования (Easy Conversion)

Тут автор в основном имел в виду преобразования между структурами Spark и Python/Pandas. Я от этого далек, поэтому не стану высказываться. Возможно про pySpark расскажу чуть ниже.

Простота трансформаций (Easy Transformations)

This method speeds up joins significantly when one of the tables is smaller than the other and can fit in its entirety on individual machines. Another asset of Spark is the “map-side join” broadcast method. This also helps mitigate problems from skew. The smaller one gets sent to all nodes so the data from the bigger table doesn’t need to be moved around. If the big table has a lot of skew on the join keys, it will try to send a large amount of data from the big table to a small number of nodes to perform the join and overwhelm those nodes.

Не знаю, что у них там в питоне, но в наших краях map-side join несложно делается либо голыми руками, либо любым из инструментов типа Crunсh. Не вижу в этом каких-то особых преимуществ, это многие умеют, вот и Hive например. При де-факто отсутствии индексов в экосистеме Hadoop map side join пожалуй один из основных инструментов оптимизации join вообще.

Скажем, «старый» RDD API, будучи наверное немного более гибким, дает в тоже время больший простор для совершения ошибки, особенно если вы работаете не на уровне классов фиксированной структуры (Java Beans), а Row и с гибкой структурой данных. API же для трансформации достаточно удобен, хотя и неоднороден. Расхождение между реальной и ожидаемой Spark схемами — вполне обычное при этом дело.

После некоторой практики вполне можно писать все на нем также легко, как на SQL, дополняя его своими UDF, и добиваясь большей гибкости. Что же до API DataSet, то я бы сказал что он очень хорош. Сами UDF при этом пишутся проще, чем для Hive, и какие-то сложности возникают разве что при возврате из них сложных структур данных (массивы, map, struct), да и то в Java и скорее потому, что ожидаются структуры для Scala.

Или геокодер. Скажем, мне удавалось достаточно легко использовать в виде UDF такую штуку, как Java порт pymorphy2. В сущности, все что нужно — это правильно инициализировать свою UDF, помня про особенности сериализации Spark.

Это не значит, что он плохой — он просто иной. А вот API Spark ML, с другой стороны, выглядит так, как будто его проектировали совсем другие люди.

Open Source Community

The community improves the core software and contributes practical add-on packages. Spark has a massive open-source community behind it. Previously, a user would either have to use other software or rely on slow user-defined functions to leverage Python packages such as Natural Language Toolkit.
For example, a team has developed a natural language processing library for Spark.

Тут в общем добавить нечего. Сообщество реально большое, квалифицированное и дружелюбное. Пишется огромное число расширений для Spark.

Очередной пассаж про slow UDF оставим на совести питониста — Scala/Java UDF вовсе не такие медленные, и при этом весьма удобные.

Что я бы добавил от себя:

Разработка на разных языках

Наверное, одной из причин популярности является и поддержка нескольких языков разработки (Scala, Java, Python и R). По большому счету, API для разных языков примерно одинаково удобен, но я бы не назвал эту поддержку идеальной. Скажем, запуская свое Spark приложение, вы сразу выбираете между Java/Scala и Python, и не можете комбинировать языки в рамках одного запуска. Таким образом, интеграция между частями приложения на pySpark (на котором часто пишут ML или NLP части), и Java/Scala реально возможна только через файлы/базы данных. Ну или что-то типа Kafka, REST и пр. варианты.

Streaming

Spark Streaming (не путать с Hadoop Streaming, который совсем для другого), это еще одна привлекательная часть возможностей Spark. Если описать ее в одном предложении, то это обработка данных потоковых, приходящих скажем из Kafka, ZeroMQ и т.п. теми же средствами, что и данные, взятые из базы.

вам практически не придется ничего менять в программе, чтобы начать обрабатывать данные из Kafka. Вся прелесть именно в том, что средства именно те же самые, т.е. Ни map reduce, ни Crunch, ни Cascading какой-нибудь вам такой фокус провернуть не позволят.

Недостатки

У каждого свои недостатки (с). С какими проблемами вы можете столкнуться при работе со Spark?

Cluster Management

That means ensuring top performance so that it doesn’t buckle under heavy data science workloads is challenging. Spark is notoriously difficult to tune and maintain. Jobs failing with out-of-memory errors is very common and having many concurrent users makes resource management even more challenging.
If your cluster isn’t expertly managed, this can negate “the Good” as we described above.

А разве кто-то обещал? Собственно, я уже писал выше, что все замечательно и просто может быть ровно в одном случае — если у вас либо задачка не очень большая, либо ресурсов сколько угодно — или иными словами, задача не слишком сложная.

В остальных же случаях, каковых очевидно большинство, Spark приложения нужно тюнить, настраивать и поддерживать.

How many of your cluster’s cores do you allow Spark to use? Do you go with fixed or dynamic memory allocation? How many partitions should Spark use when it shuffles data? How much memory does each executor get? Getting all these settings right for data science workloads is difficult.

Скажем, казалось бы сравнительно простая задача выбора числа executors. В принципе, зная кое-что про свои данные, можно это число спокойно рассчитать. Но в условиях, когда ресурсы используете не только вы, все становится намного веселее. Если же ваш процесс включает еще и обращения к другим приложениям, то…

И им занимается отдельный сервер ArcGIS. Например, у меня есть приложение, частью функциональности которого является обратное геокодирование. А если мы перекладываем эту задачу на Spark (переписав предварительно код приложения), то время работы сокращается на пару порядков — благодаря тому, что мы можем использовать ресурсы кластера и для этой задачи тоже. При этом ArcGIS имеет в своем распоряжении всего 4 ядра, а кластер Hadoop, где выполняется Spark, имеет десятки узлов, в итоге если мы просто выделяем Spark всего-то 8 executors, то кривая загрузки процессора ArcGIS подскакивает до 100%, где и остается на пару часов работы приложения.

Соответственно, ожидать от Spark, что он оптимизирует использование этих ресурсов, было бы наивно. То есть, у нас зачастую имеется бутылочное горлышко, где либо выделен фиксированный объем ресурсов, либо управление этими ресурсами осуществляется другим способом (на который Spark не может повлиять).

Отладка (Debugging)

Это чистая правда. Ожидаемая, впрочем. Мы имеем распределенную параллельную систему, отладка и мониторинг которой представляют собой нетривиальную задачу. SparkUI в какой-то степени решает вопросы наблюдения, а Spark Metrics — измерения производительности, но попробуйте, скажем, подключиться к исполняемому приложению отладчиком — вы не знаете ни хост, где оно работает, ни порт, который окажется свободным для подключения. Те же метрики, которые для обычного приложения могут быть легко получены например из JMX, в случае приложения распределенного должны передаваться по сети, и только потом могут быть собраны. Да, с этим все относительно плохо.

Плохое быстродействие UDF в PySpark (Slowness of PySpark UDFs)

Ну что я тут могу сказать? За что боролись — на то и напоролись (с). Насколько я понимаю, UDF на питоне приводит к тому, что происходит двойное преобразование данных между приложением и UDF. Просто потому, что питон все-таки чуждый язык для экосистемы JVM, на которой работает Spark, и UDF исполняется вне ее.

Понятно, что не всегда этому совету хочется и можно следовать, но боюсь что решить эту проблему глобально сможет разве что Graal, когда его версию питона доведут до промышленного уровня. Тут можно посоветовать только одно — не пишите на питоне, пишите на Scala/Java.

Сложно гарантировать максимальный уровень параллелизма (Hard-to-Guarantee Maximal Parallelism)

Spark tries to elastically scale how many executors a job uses based on the job’s needs, but it often fails to scale up on its own. One of Spark’s key value propositions is distributed computation, yet it can be difficult to ensure Spark parallelizes computations as much as possible. Also, Spark divides RDDs (Resilient Distributed Dataset)/DataFrames into partitions, which is the smallest unit of work that an executor takes on. So if you set the minimum number of executors too low, your job may not utilize more executors when it needs them. Also, fewer partitions means larger partitions, which can cause executors to run out of memory.
If you set too few partitions, then there may not be enough chunks of work for all the executors to work on.

Если бы все было так просто. Начнем с простого — параметры для запуска следует тюнить для каждого конкретного кластера. Prod кластер может иметь на порядок больше узлов, и в разы больше памяти, доступной на каждом. Настройки для Dev кластера вероятно будут занижены при запуске на Prod. Все это еще больше усложняется, если начать учитывать текущую загрузку кластера задачами. В общем виде эта задача выделения ресурсов кластера является задачей оптимизации, достаточно нетривиальной, и не имеет единственного верного решения.

А если их слишком много — то размер каждой может оказаться ниже, чем некоторый условный нижний предел, вроде размера блока HDFS. Если партиций мало, то параллелизм недостаточен. Поскольку каждая задача — это ресурсы, затраченные на ее запуск, очевидно есть нижний предел размеров задачи, ниже которого опускаться не стоит, потому что накладные расходы растут быстрее, чем производительность.

Если в случае «обычной» задачи map-reduce на Hadoop мы как правило доставляем код к данным, т.е. Простой пример — приложение, которому нужен некоторый значительный объем справочников. И вот внезапно размер данных, доставляемых на каждый узел вырос на пару порядков — было например 10 мегабайт (небольшое Spark приложение, без самого Spark), стало например 20 гигабайт (вполне реальный случай, справочники, нужные для нормализации адресов, телефонов и пр. копируем наше приложение + части Spark на узлы кластера, где лежит наш файл (файлы), то справочники — это уже похоже на map side join, и их нужно доставлять вместе с кодом. Ну и вот она — цена излишнего параллелизма, налицо. данных вполне тянут на такой объем).

Вероятно, что это число близко к оптимальному с точки зрения чтения данных. Возможно существует некоторое естественное число партиций, которое определяется числом блоков, на которые разбит наш входной файл, с учетом коэффициента репликации. Разумеется, Spark учитывает эти параметры, когда динамически распределяет ресурсы. То есть, если у нас в файле три блока, и каждый блок имеет копии на 2 узлах кластера, то мы естественным образом можем параллельно вести обработку в 6 потоков, обрабатывая каждую реплику на своем узле.

Им является например Yarn. К сожалению или к счастью, Spark не является планировщиком ресурсов кластера. Так что у Spark просто может быть недостаточно информации, чтобы оптимально планировать использование всех ресурсов.

Не слишком хорошая интеграция с Hive

С одной стороны, Spark отлично работает с данными и метаданными Hive. Я бы сказал, что большая часть приложений, что мне попадались, как раз этим и занимается. Но не обходится без досадных проблем. Скажем, если вы попробуете воспользоваться в Spark его средствами partitionBy и bucketBy, очень велика вероятность, что Hive результаты вашей работы не увидит. При этом все что вы получите — это невнятный warning где-нибудь в логах.

Совместимость

К сожалению, мой опыт говорит на эту тему скорее плохое. Мы натыкались на множественные проблемы при попытках запускать приложения на кластерах, где версия Spark отличалась от ожидаемой. При разработке на Spark 2.2.0 проблемы были как при запуске на 2.1, так и на 2.3.

3 один из codecs (а именно snappy). Скажем, в нашем случае Spark почему-то не мог найти при запуске на версии 2. Это не слишком серьезная проблема, если вам нужно записать данные (вы можете указать кодек при записи, и выбрать любой, включая не упакованные данные), но если вам нужно прочитать что-то, что упаковано snappy, то вам явно не повезло.

Все-таки, мне кажется что миграция между минорными версиями должна была бы быть более плавной. Возможно, какие-то из проблем были вызваны ошибками в установщике, но сильно от этого не легче.

2 и 2. Ну и увы, но Spark не предполагает штатную параллельную установку на один кластер двух разных версий одной линейки (те же 2. 3).

Ужасные стороны

API Awkwardness

For example, we consider accessing array elements to be an ugly part of Spark life.
Since much of the Spark API is so elegant, the inelegant parts really stand out.

Не сказал бы, что работа с массивами так уж ужасна. Некоторые неудобства приносит тот факт, что Spark API изначально сделан на Scala, а там своя структура коллекций, которую работая из Java приходится приводить к скаловской. А так, если вы способны написать UDF, то вы способны делать с массивами все что угодно. А, ну да — в питоне же все плохо с UDF, все время забываю.

Это пытается решить новая на сегодня версия Spark 2. Не очень удобно и не слишком эффективно — да, возможно. 4, где введены новые функции высшего порядка для работы со сложными структурами (что позволит избежать применения explode/collect).

При этом механизм распространения кода по узлам предполагает его сериализацию (тем или иным способом), и тот код, который работает на executors, должен быть сериализуемым. На мой взгляд, намного более неудобной стороной API является то, что глядя на код, далеко не всегда очевидно, какая именно часть будет выполняться на driver, а какие — на других узлах. Разбираясь с ошибками сериализации вы сможете узнать много нового и интересного о своем коде :).

Classloaders

К сожалению, вопрос изоляции кода приложения от кода Spark решен недостаточно хорошо. Впрочем, тоже самое относится и к классическим map-reduce приложениям Hadoop. При этом код Hadoop использует некоторые древние версии такой библиотеки, как Google Guava, да и другие библиотеки далеко не новы, прямо скажем. Если вспомнить, что авторы Guava любят вносить в свой API обратную несовместимость, удаляя deprecated методы, то мы получаем совершенно дурацкую картину — вы пишете свой код под Guava свежей версии, запускаете, и оно падает — либо потому, что реально вы работаете с версией Guava из Hadoop (намного более старой), и ваш код не находит методов из новой версии, либо Hadoop падает, потому что несовместим с версией новой. Это достаточно типичная, к сожалению проблема, с которой сталкивается наверное каждый второй разработчик. Библиотека Apache Http Components — еще один пример подобной проблемы.

SQL без bind variables

Увы, но типичный код для выполнения запроса на спарке выглядит вот так:

val sqlDF = spark.sql(«SELECT * FROM people WHERE id= 1»)

и подстановкой параметров при каждом выполнении. В API не предусмотрено варианта для выполнения запроса id=? Объективности ради, этим же страдает и Hive, где тоже нельзя определить запрос с параметрами. Ну ладно, допустим проблема SQL-injection авторов не беспокоит, но параметры в запрос должны подставить разработчики, соответственно, замена спецсимволов — целиком на нас с вами.

Неформально можно оказывается написать вместо таблицы что-то типа (select a, b, c from d) t, но будет ли это работать во всех случаях — никто вам точно не расскажет. Впрочем, что еще смешнее, для JDBC источников формально нельзя даже написать запрос — можно только указать таблицу, но не колонки.

Lack of Maturity and Feature Completeness

Мда. Чужая голова — потемки.

A sequential, unique index column is helpful for some types of analysis. Another example feature gap is difficulty creating sequential unique record identifiers with Spark. If consecutive IDs are important to you, then you may need to use Spark’s older RDD format. According to the documentation, “monotonically_increasing_id()” generates a unique ID for each row, but does not guarantee that the IDs are consecutive.

Не понимаю таких претензий. Исходники же доступны, и вполне можно заглянуть, и хотя бы прочитать комментарии:

Returns monotonically increasing 64-bit integers.

  • The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
  • The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits
  • represent the record number within each partition. The assumption is that the data frame has
  • less than 1 billion partitions, and each partition has less than 8 billion records.

Ну т.е., эта функция просто берет номер партиции, и добавляет к нему счетчик. Естественно, вам не гарантируют, что никто не будет вызывать ее между вашими двумя последовательными вызовами. Одно Spark приложение — это потенциально множество JVM, работающих на разных узлах кластера, и вероятно множество потоков выполнения внутри одной JVM.

А теперь пусть автор подумает еще немного, и попробует придумать способ генерировать в этой параллельной и распределенной системе именно такие id, какие ему хочется, не создав при этом единой точки генерации (которая будет заведомо бутылочным горлышком), и без блокировок (которые будут тем же самым).

Что мы ждем от Spark 2.4

Уже упомянутые функции высшего порядка

Это реально хорошо. Главное чтобы работало.

По сути, это набор встроенных функций для работы с массивами или map-ами, а также возможность выполнять трансформации над ними при помощи собственных функций (лябмд).

вот тут можно посмотреть некоторые примеры использования.

Новый режим исполнения

Это так называемый barier планировщик и режим выполнения. Авторы предназначают его для задач машинного обучения, но набор таких задач конечно несколько шире. По сути, это такие задачи, которые не являются обычными для Spark map-reduce. Насколько я это понял — это по большей части компоненты обмена сообщениями, которые запускаются один раз, или в случае аварийного их завершения.

Скажем, в нашей компании такие компоненты оформляются как Yarn-приложения, и работают от Spark несколько отдельно. Если API для поддержки таких задач будет удобным — то потребность в нем точно есть. Более тесная и удобная интеграция в рамках Spark была бы нелишней.

Улучшенная поддержка Avro

Поддержка Avro в общем и так была неплохой. Поддержаны некоторые дополнительные типы данных, а именно так называемые «логические типы» (по сути — некие производные типы), куда входят Decimal, Date, Time, Duration и другие.

Это и сейчас можно, но с Avro это выглядит и работает удобнее. Я, откровенно говоря, больше жду когда авторы Hive (ну и Spark заодно) научатся лучше поддерживать паркет, создавая таблицы на основе его схемы.

Поддержка Scala 2.12 (экспериментальная)

Казалось бы, мне, как в основном Java программисту, это безразлично, однако же в рамках этого проекта обещали улучшить взаимодействие с Java 8, например сериализацию лямбд, что было бы очень и очень приятно.

Показать больше

Похожие публикации

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»