首页 文章

解决Apache Spark中的依赖性问题

提问于
浏览
23

构建和部署Spark应用程序时的常见问题是:

  • java.lang.ClassNotFoundException .

  • object x is not a member of package y 编译错误 .

  • java.lang.NoSuchMethodError

如何解决这些问题?

5 回答

  • 19

    Apache Spark的类路径是动态构建的(以适应每个应用程序的用户代码),这使得它易受此类问题的攻击 . @user7337271的答案是正确的,但还有一些问题,取决于你正在使用的 cluster manager ("master") .

    首先,Spark应用程序由这些组件组成(每个组件都是一个单独的JVM,因此可能在其类路径中包含不同的类):

    • Driver :这是您的应用程序创建 SparkSession (或 SparkContext )并连接到集群管理器以执行实际工作

    • Cluster Manager :作为集群的"entry point",负责为每个应用程序分配执行程序 . Spark支持几种不同的类型:独立,YARN和Mesos,我们将在下面描述 .

    • Executors :这些是集群节点上的进程,执行实际工作(运行Spark任务)

    Apache Spark的cluster mode overview在此图中描述了它们之间的关系:

    现在 - which classes should reside in each of these components?

    这可以通过下图解答:

    我们慢慢解析一下:

    • Spark Code 是Spark的库 . 它们应该存在于所有三个组件中,因为它们包含允许's Spark perform the communication between them. By the way - Spark authors made a design decision to include code for ALL components in ALL components (e.g. to include code that should only run in Executor in driver too) to simplify this - so Spark' s "fat jar"(版本高达1.6)或"archive"(2.0版,下面的详细信息)包含所有组件的必要代码,并且应该在所有组件中都可用 .

    • Driver-Only Code 这是用户代码,不包括应在Executors上使用的任何内容,即RDD / DataFrame / Dataset上的任何转换中未使用的代码 . 这不一定必须与分布式用户代码分开,但也可以 .

    • Distributed Code 这是用驱动程序代码编译的用户代码,但也必须在执行程序上执行 - 实际转换使用的所有内容都必须包含在此jar中 .

    既然我们已经完成了这一点, how 我们是否可以在每个类中正确加载类组件,它们应该遵循什么规则?

    • Spark Code :如前面的答案所述,您必须在所有组件中使用相同的 ScalaSpark 版本 .

    1.1在 Standalone 模式下,应用程序(驱动程序)可以连接到"pre-existing" Spark安装 . 这意味着 all drivers must use that same Spark version 在主服务器和执行程序上运行 .

    1.2在 YARN / Mesos 中,每个应用程序可以使用不同的Spark版本,但同一应用程序的所有组件必须使用相同的版本 . 这意味着如果您使用版本X来编译和打包驱动程序应用程序,则应在启动SparkSession时提供相同的版本(例如,在使用YARN时通过 spark.yarn.archivespark.yarn.jars 参数) . 您提供的jar / archive应该包含所有Spark依赖项( including transitive dependencies ),并且在应用程序启动时,集群管理器会将其提供给每个执行程序 .

    • Driver Code :这完全取决于 - 驱动程序代码可以作为一堆jar或"fat jar"发送,只要它包含所有Spark依赖项所有用户代码

    • Distributed Code :除了出现在驱动程序中之外,还必须将此代码发送给执行程序(同样,还要附带所有传递依赖项) . 这是使用 spark.jars 参数完成的 .

    To summarize ,这是建议和部署Spark应用程序的建议方法(在这种情况下 - 使用YARN):

    • 使用您的分布式代码创建一个库,将其打包为"regular" jar(带有描述其依赖项的.pom文件)和"fat jar"(包含其所有传递依赖项) .

    • 创建一个驱动程序应用程序,在分布式代码库和Apache Spark上使用编译依赖项(具有特定版本)

    • 将驱动程序应用程序打包到胖jar中以部署到驱动程序

    • 启动 SparkSession 时,将正确版本的分布式代码作为 spark.jars 参数的值传递

    • 将包含已下载的Spark二进制文件的 lib/ 文件夹下的所有jar的存档文件(例如gzip)的位置作为 spark.yarn.archive 的值传递

  • 3

    我认为这个问题必须解决一个程序集插件 . 你需要建一个胖 jar . 例如在sbt中:

    • 使用代码 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0") 添加文件 $PROJECT_ROOT/project/assembly.sbt

    • to build.sbt added some libraries libraryDependencies = Seq("com.some.company" %% "some-lib"%"1.0.0")`
      在sbt控制台

    • 输入"assembly",并部署程序集jar

    如果您需要更多信息,请转到https://github.com/sbt/sbt-assembly

  • 2

    除了user7337271已经提供的非常广泛的答案之外,如果问题是由于缺少外部依赖性而导致的,那么您可以构建一个带有依赖关系的jar,例如: maven assembly plugin

    在这种情况下,请确保在构建系统中将所有核心spark依赖项标记为“已提供”,并且如前所述,确保它们与运行时spark版本相关联 .

  • 0

    应用程序的依赖项类应在启动命令的 application-jar 选项中指定 .

    更多细节可以在Spark documentation找到

    取自文档:

    application-jar:包含应用程序和所有依赖项的捆绑jar的路径 . URL必须在群集内部全局可见,例如,hdfs://路径或所有节点上存在的file://路径

  • 14

    构建和部署Spark应用程序时,所有依赖项都需要兼容版本 .

    • Scala version . 所有包都必须使用相同的主要(2.10,2.11,2.12)Scala版本 .

    考虑以下(不正确) build.sbt

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    我们将 spark-streaming 用于Scala 2.10,而剩余的包用于Scala 2.11 . valid 文件可能是

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.11" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    但最好全局指定版本并使用 %%

    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.11.7"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" %% "spark-core" % "2.0.1",
       "org.apache.spark" %% "spark-streaming" % "2.0.1",
       "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1"
    )
    

    同样在Maven中:

    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
    
    • Spark version 所有包都必须使用相同的主要Spark版本(1.6,2.0,2.1,...) .

    考虑以下(不正确的)build.sbt:

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "1.6.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    我们使用 spark-core 1.6而其余组件都在Spark 2.0中 . valid 文件可能是

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    但最好使用变量:

    name := "Simple Project"
    
    version := "1.0"
    
    val sparkVersion = "2.0.1"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % sparkVersion,
       "org.apache.spark" % "spark-streaming_2.10" % sparkVersion,
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion
    )
    

    同样在Maven中:

    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
        <scala.version>2.11</scala.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
    
    • Spark依赖项中使用的Spark版本必须匹配Spark安装的Spark版本 . 例如,如果在群集上使用1.6.1,则必须使用1.6.1来构建jar . 不一定接受次要版本不匹配 .

    • 用于构建jar的Scala版本必须与用于构建部署Spark的Scala版本相匹配 . 默认情况下(可下载的二进制文件和默认构建):

    • Spark 1.x - > Scala 2.10

    • Spark 2.x - > Scala 2.11

    • 如果包含在fat jar中,则应该可以在工作节点上访问其他包 . 有很多选择,包括:

    2587 _的

    • --jars 参数 - 分发本地 jar 文件 .
      spark-submit
    • --packages 参数 - 从Maven存储库获取依赖项 .

    在群集节点中提交时,应在 --jars 中包含应用程序 jar .

相关问题