我想在Spark Jobserver上运行Spark Job . 在执行期间,我得到一个例外:
stack :
java.lang.RuntimeException:scala.ScalaReflectionException:JavaMirror中的类com.some.example.instrument.data.SQLMapping,类型为org.apache.spark.util.MutableURLClassLoader的org.apache.spark.util.MutableURLClassLoader@55b699ef classpath [file:/app/spark-job-server.jar]和父类是sun.misc.Launcher$AppClassLoader@2e817b38,类型为sun.misc.Launcher $ AppClassLoader,类路径[... / classpath jars /]未找到 . at scala.reflect.internal.Mirrors $ RootsBase.staticClass(Mirrors.scala:123)at scala.reflect.internal.Mirrors $ RootsBase.staticClass(Mirrors.scala:22)at com.some.example.instrument.DataRetriever $$ anonfun $ combineMappings $ 1 $$ typecreator15 $ 1.apply(DataRetriever.scala:136)at scala.reflect.api.TypeTags $ weakTypeTagImpl.tpe $ lzycompute(TypeTags.scala:232)at scala.reflect.api.TypeTags $ WeakTypeTagImpl.tpe (TypeTags.scala:232)org.apache.spark.sql.catalyst.encoders.ExpressionEncoder $ .apply(ExpressionEncoder.scala:49)at org.apache.spark.sql.Encoders $ .product(Encoders.scala:275) )org.apache.spark.sql.LowPrioritySQLImplicits $ class.newProductEncoder(SQLImplicits.scala:233)位于com.some.example.instrument的org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33) . DataRetriever $$ anonfun $ combineMappings $ 1.apply(DataRetriever.scala:136)at com.some.example.instrument.DataRetriever $$ anonfun $ combineMappings $ 1.apply(DataRetriever.scala:135)at scala.util.Success $$ anonfun $ $图1.适用( Try.scala:237)在scala.util.Try $ .apply(Try.scala:192)scala.util.Success.map(Try.scala:237)scala.concurrent.Future $$ anonfun $ map $ 1 . 申请(Future.scala:237)at scala.concurrent.Future $$ anonfun $ map $ 1.apply(Future.scala:237)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)at scala.concurrent .impl.ExecutionContextImpl $ AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339 )scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
在 DataRetriever
中,我将简单的case类转换为DataSet .
case class definition :
case class SQLMapping(id: String,
it: InstrumentPrivateKey,
cc: Option[String],
ri: Option[SourceInstrumentId],
p: Option[SourceInstrumentId],
m: Option[SourceInstrumentId])
case class SourceInstrumentId(instrumentId: Long,
providerId: String)
case class InstrumentPrivateKey(instrumentId: Long,
providerId: String,
clientId: String)
code that causes a problem:
import session.implicits._
def someFunc(future: Future[ID]): Dataset[SQLMappins] = {
future.map {f =>
val seq: Seq[SQLMapping] = getFromEndpoint(f)
val ds: Dataset[SQLMapping] = seq.toDS()
...
}
}
这项工作有时会奏效,但如果我重新开始工作,就会抛出异常 .
update 28.03.2018 我忘了提一个细节,结果证明这很重要 . 数据集是在 Future
内构建的 .
1 回答
在将来调用
toDS()
导致ScalaReflectionException .我决定在
future.map
之外构造DataSet .您可以使用此示例作业验证无法在
future.map
中构建数据集 .稍后我会提供信息,如果问题仍然存在使用spark 2.3.0,以及直接通过
spark-submit
传递jar .