首页 文章

在Spark中创建DataSet期间的Scala Reflection异常

提问于
浏览
1

我想在Spark Jobserver上运行Spark Job . 在执行期间,我得到一个例外:

stack

java.lang.RuntimeException:scala.ScalaReflectionException:JavaMirror中的类com.some.example.instrument.data.SQLMapping,类型为org.apache.spark.util.MutableURLClassLoader的org.apache.spark.util.MutableURLClassLoader@55b699ef classpath [file:/app/spark-job-server.jar]和父类是sun.misc.Launcher$AppClassLoader@2e817b38,类型为sun.misc.Launcher $ AppClassLoader,类路径[... / classpath jars /]未找到 . at scala.reflect.internal.Mirrors $ RootsBase.staticClass(Mirrors.scala:123)at scala.reflect.internal.Mirrors $ RootsBase.staticClass(Mirrors.scala:22)at com.some.example.instrument.DataRetriever $$ anonfun $ combineMappings $ 1 $$ typecreator15 $ 1.apply(DataRetriever.scala:136)at scala.reflect.api.TypeTags $ weakTypeTagImpl.tpe $ lzycompute(TypeTags.scala:232)at scala.reflect.api.TypeTags $ WeakTypeTagImpl.tpe (TypeTags.scala:232)org.apache.spark.sql.catalyst.encoders.ExpressionEncoder $ .apply(ExpressionEncoder.scala:49)at org.apache.spark.sql.Encoders $ .product(Encoders.scala:275) )org.apache.spark.sql.LowPrioritySQLImplicits $ class.newProductEncoder(SQLImplicits.scala:233)位于com.some.example.instrument的org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33) . DataRetriever $$ anonfun $ combineMappings $ 1.apply(DataRetriever.scala:136)at com.some.example.instrument.DataRetriever $$ anonfun $ combineMappings $ 1.apply(DataRetriever.scala:135)at scala.util.Success $$ anonfun $ $图1.适用( Try.scala:237)在scala.util.Try $ .apply(Try.scala:192)scala.util.Success.map(Try.scala:237)scala.concurrent.Future $$ anonfun $ map $ 1 . 申请(Future.scala:237)at scala.concurrent.Future $$ anonfun $ map $ 1.apply(Future.scala:237)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)at scala.concurrent .impl.ExecutionContextImpl $ AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339 )scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

DataRetriever 中,我将简单的case类转换为DataSet .

case class definition

case class SQLMapping(id: String,
                      it: InstrumentPrivateKey,
                      cc: Option[String],
                      ri: Option[SourceInstrumentId],
                      p: Option[SourceInstrumentId],
                      m: Option[SourceInstrumentId])

case class SourceInstrumentId(instrumentId: Long,
                              providerId: String)

case class InstrumentPrivateKey(instrumentId: Long,
                                providerId: String,
                                clientId: String)

code that causes a problem:

import session.implicits._
def someFunc(future: Future[ID]): Dataset[SQLMappins] = {
future.map {f =>
val seq: Seq[SQLMapping] = getFromEndpoint(f)
val ds: Dataset[SQLMapping] = seq.toDS()
...
 }
}

这项工作有时会奏效,但如果我重新开始工作,就会抛出异常 .

update 28.03.2018 我忘了提一个细节,结果证明这很重要 . 数据集是在 Future 内构建的 .

1 回答

  • 1

    在将来调用 toDS() 导致ScalaReflectionException .

    我决定在 future.map 之外构造DataSet .

    您可以使用此示例作业验证无法在 future.map 中构建数据集 .

    package com.example.sparkapplications
    
    import com.typesafe.config.Config
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SparkSession
    import scala.concurrent.Await
    import scala.concurrent.Future
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    import spark.jobserver.SparkJob
    import spark.jobserver.SparkJobValid
    import spark.jobserver.SparkJobValidation
    
    object FutureJob extends SparkJob{
      override def runJob(sc: SparkContext,
                          jobConfig: Config): Any = {
        val session = SparkSession.builder().config(sc.getConf).getOrCreate()
        import session.implicits._
        val f = Future{
          val seq = Seq(
            Dummy("1", 1),
            Dummy("2", 2),
            Dummy("3", 3),
            Dummy("4", 4),
            Dummy("5", 5)
          )
    
          val ds = seq.toDS
    
          ds.collect()
        }
    
        Await.result(f, 10 seconds)
      }
    
      case class Dummy(id: String, value: Long)
      override def validate(sc: SparkContext,
                            config: Config): SparkJobValidation = SparkJobValid
    }
    

    稍后我会提供信息,如果问题仍然存在使用spark 2.3.0,以及直接通过 spark-submit 传递jar .

相关问题