Вопрос-ответ

Resolving dependency problems in Apache Spark

Решение проблем с зависимостями в Apache Spark

Распространенными проблемами при создании и развертывании приложений Spark являются:


  • java.lang.ClassNotFoundException.

  • object x is not a member of package y ошибки компиляции.

  • java.lang.NoSuchMethodError

Как их можно решить?

Переведено автоматически
Ответ 1

Путь к классам Apache Spark создается динамически (для учета пользовательского кода каждого приложения), что делает его уязвимым для подобных проблем. ответ @User правильный, но есть еще некоторые проблемы, в зависимости от менеджера кластеров ("master"), который вы используете.

Во-первых, приложение Spark состоит из этих компонентов (каждый из которых является отдельной JVM, поэтому потенциально содержит разные классы в своем пути к классам):


  1. Драйвер: это ваше приложение, создающее SparkSession (или SparkContext) и подключающееся к менеджеру кластеров для выполнения фактической работы

  2. Диспетчер кластеров: служит "точкой входа" в кластер, отвечает за распределение исполнителей для каждого приложения. В Spark поддерживается несколько различных типов: standalone, YARN и Mesos, которые мы опишем ниже.

  3. Исполнители: это процессы на узлах кластера, выполняющие фактическую работу (запускающие задачи Spark)

Взаимосвязь между ними описана на этой диаграмме из обзора кластерного режима Apache Spark:

Обзор кластерного режима

Теперь - какие классы должны находиться в каждом из этих компонентов?

На этот вопрос можно ответить с помощью следующей диаграммы:

Обзор размещения классов

Давайте разберем это медленно:


  1. Код Spark - это библиотеки Spark. Они должны присутствовать во ВСЕХ трех компонентах, поскольку они включают связующее звено, которое позволяет Spark осуществлять связь между ними. Кстати, авторы Spark приняли проектное решение включить код для ВСЕХ компонентов во ВСЕ компоненты (например, включить код, который должен выполняться только в Executor, и в драйвер), чтобы упростить это - поэтому "fat jar" (в версиях до 1.6) или "archive" (в версии 2.0, подробности приведены ниже) Spark содержат необходимый код для всех компонентов и должны быть доступны во всех из них.


  2. Код только для драйвера это пользовательский код, который не включает ничего, что должно использоваться на исполнителях, т. Е. Код, который не используется ни в каких преобразованиях в RDD / DataFrame / Dataset. Это необязательно должно быть отделено от распространяемого пользовательского кода, но может быть.


  3. Распределенный код это пользовательский код, который компилируется с кодом драйвера, но также должен выполняться на исполнителях - все, что используют фактические преобразования, должно быть включено в этот jar (ы).


Теперь, когда мы разобрались с этим, как нам добиться правильной загрузки классов в каждом компоненте и каким правилам они должны следовать?


  1. Spark Code: as previous answers state, you must use the same Scala and Spark versions in all components.


    1.1 In Standalone mode, there's a "pre-existing" Spark installation to which applications (drivers) can connect. That means that all drivers must use that same Spark version running on the master and executors.


    1.2 In YARN / Mesos, each application can use a different Spark version, but all components of the same application must use the same one. That means that if you used version X to compile and package your driver application, you should provide the same version when starting the SparkSession (e.g. via spark.yarn.archive or spark.yarn.jars parameters when using YARN). The jars / archive you provide should include all Spark dependencies (including transitive dependencies), and it will be shipped by the cluster manager to each executor when the application starts.


  2. Driver Code: that's entirely up to - driver code can be shipped as a bunch of jars or a "fat jar", as long as it includes all Spark dependencies + all user code


  3. Distributed Code: in addition to being present on the Driver, this code must be shipped to executors (again, along with all of its transitive dependencies). This is done using the spark.jars parameter.


To summarize, here's a suggested approach to building and deploying a Spark Application (in this case - using YARN):


  • Create a library with your distributed code, package it both as a "regular" jar (with a .pom file describing its dependencies) and as a "fat jar" (with all of its transitive dependencies included).

  • Create a driver application, with compile-dependencies on your distributed code library and on Apache Spark (with a specific version)

  • Package the driver application into a fat jar to be deployed to driver

  • Pass the right version of your distributed code as the value of spark.jars parameter when starting the SparkSession

  • Pass the location of an archive file (e.g. gzip) containing all the jars under lib/ folder of the downloaded Spark binaries as the value of spark.yarn.archive

Ответ 2

When building and deploying Spark applications all dependencies require compatible versions.


  • Scala version. All packages have to use the same major (2.10, 2.11, 2.12) Scala version.


    Consider following (incorrect) build.sbt:


    name := "Simple Project"

    version := "1.0"

    libraryDependencies ++= Seq(
    "org.apache.spark" % "spark-core_2.11" % "2.0.1",
    "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
    "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )

    We use spark-streaming for Scala 2.10 while remaining packages are for Scala 2.11. A valid file could be


    name := "Simple Project"

    version := "1.0"

    libraryDependencies ++= Seq(
    "org.apache.spark" % "spark-core_2.11" % "2.0.1",
    "org.apache.spark" % "spark-streaming_2.11" % "2.0.1",
    "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )

    but it is better to specify version globally and use %% (which appends the scala version for you):


    name := "Simple Project"

    version := "1.0"

    scalaVersion := "2.11.7"

    libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "2.0.1",
    "org.apache.spark" %% "spark-streaming" % "2.0.1",
    "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1"
    )

Similarly in Maven:

    <project>
<groupId>com.example</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<spark.version>2.0.1</spark.version>
</properties>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-twitter_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>

  • Spark version All packages have to use the same major Spark version (1.6, 2.0, 2.1, ...).


    Consider following (incorrect) build.sbt:


    name := "Simple Project"

    version := "1.0"

    libraryDependencies ++= Seq(
    "org.apache.spark" % "spark-core_2.11" % "1.6.1",
    "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
    "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )

    We use spark-core 1.6 while remaining components are in Spark 2.0. A valid file could be


    name := "Simple Project"

    version := "1.0"

    libraryDependencies ++= Seq(
    "org.apache.spark" % "spark-core_2.11" % "2.0.1",
    "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
    "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )

    but it is better to use a variable
    (still incorrect):


    name := "Simple Project"

    version := "1.0"

    val sparkVersion = "2.0.1"

    libraryDependencies ++= Seq(
    "org.apache.spark" % "spark-core_2.11" % sparkVersion,
    "org.apache.spark" % "spark-streaming_2.10" % sparkVersion,
    "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion
    )

Similarly in Maven:

    <project>
<groupId>com.example</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<spark.version>2.0.1</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-twitter_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>

  • Spark version used in Spark dependencies has to match Spark version of the Spark installation. For example if you use 1.6.1 on the cluster you have to use 1.6.1 to build jars. Minor versions mismatch are not always accepted.


  • Scala version used to build jar has to match Scala version used to build deployed Spark. By default (downloadable binaries and default builds):



    • Spark 1.x -> Scala 2.10

    • Spark 2.x -> Scala 2.11


  • Additional packages should be accessible on the worker nodes if included in the fat jar. There are number of options including:



    • --jars argument for spark-submit - to distribute local jar files.

    • --packages argument for spark-submit - to fetch dependencies from Maven repository.


    When submitting in the cluster node you should include application jar in --jars.


Ответ 3

In addition to the very extensive answer already given by user7337271, if the problem results from missing external dependencies you can build a jar with your dependencies with e.g. maven assembly plugin

In that case, make sure to mark all the core spark dependencies as "provided" in your build system and, as already noted, make sure they correlate with your runtime spark version.

Ответ 4

Dependency classes of your application shall be specified in the application-jar option of your launching command.

More details can be found at the Spark documentation

Taken from the documentation:


application-jar: Path to a bundled jar including your application and
all dependencies. The URL must be globally visible inside of your
cluster, for instance, an hdfs:// path or a file:// path that is
present on all nodes


java