首页 文章

Spark 1.6中的数据集

提问于
浏览
2

我正在评估将现有的RDD代码替换为Dataset . 对于我的一个用例,我无法将数据集映射到另一个案例类 .

这是我想要做的......

case class MyMap(map: Map[String, String])

case class V1(a: String, b: String){
  def toMyMap: MyMap = {
    MyMap(Map(a->b))
  }

  def toStr: String = {
    a
  }
}

object MyApp extends App {
//Get handle to sqlContext and other useful stuff here.
val df1 = sqlContext.createDataset(Seq(V1("2015-05-01", "data1"), V1("2015-05-01", "data2"))).toDF()
df1.as[V1].map(_.toMyMap).show() //Errors out. Added the exception below.
df1.as[V1].map(_.toStr).show() //Works fine.
}

任何帮助,将不胜感激 .

有以下例外:

线程“main”中的异常org.apache.spark.SparkException:作业因阶段失败而中止:任务不可序列化:java.io.NotSerializableException:scala.reflect.runtime.SynchronizedSymbols $ SynchronizedSymbol $$ anon $ 1序列化堆栈: - 对象不可序列化(类:scala.reflect.runtime.SynchronizedSymbols $ SynchronizedSymbol $$ anon $ 1,value:package lang) - field(类:scala.reflect.internal.Types $ ThisType,name:sym,type:class scala.reflect . internal.Symbols $ Symbol) - object(类scala.reflect.internal.Types $ UniqueThisType,java.lang.type) - field(类:scala.reflect.internal.Types $ TypeRef,name:pre,type:class scala . reflect.internal.Types $ Type) - object(类scala.reflect.internal.Types $ ClassNoArgsTypeRef,String) - field(类:scala.reflect.internal.Types $ TypeRef,name:normalized,type:class scala.reflect . internal.Types $ Type) - object(类scala.reflect.internal.Types $ AliasNoArgsTypeRef,String) - field(类:org.apache.spark.sql.catalyst.ScalaReflection $$ anonfun $ 6,name:keyType $ 1,类型:class scala.reflect.api.Types $ TypeApi) - object(class org.apache.spark.sql.catalyst.ScalaReflection $$ anonfun $ 6,) - field(class:org) .apache.spark.sql.catalyst.expressions.MapObjects,name:function,type:interface scala.Function1) - object(class org.apache.spark.sql.catalyst.expressions.MapObjects,mapobjects(,invoke(upcast(' map,MapType(StringType,StringType,true), - field(类:“scala.collection.immutable.Map”,名称:“map”), - root class:“collector.MyMap”),keyArray,ArrayType(StringType, true)),StringType)) - field(类:org.apache.spark.sql.catalyst.expressions.Invoke,name:targetObject,type:class org.apache.spark.sql.catalyst.expressions.Expression) - object( class org.apache.spark.sql.catalyst.expressions.Invoke,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true), - field(class:“scala.collection.immutable.Map”) ,name:“map”), - root class:“collector.MyMap”),keyArray,ArrayType(StringType,true)),StringType),array,Obje ctType(class [Ljava.lang.Object;))) - writeObject数据(类:scala.collection.immutable.List $ SerializationProxy) - object(类scala.collection.immutable.List $ SerializationProxy,scala.collection.immutable.List $ SerializationProxy @ 7e78c3cf) - writeReplace data(class:scala.collection.immutable.List $ SerializationProxy) - object(类scala.collection.immutable . $冒号$冒号,List(invoke)(mapobjects(,invoke(upcast('map, MapType(StringType,StringType,true), - field(类:“scala.collection.immutable.Map”,名称:“map”), - root class:“collector.MyMap”),keyArray,ArrayType(StringType,true) ),StringType),array,ObjectType(class [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true), - field(class:“scala.collection) .immutable.Map“,name:”map“), - root class:”collector.MyMap“),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;) ))) - field(类:org.apache.spark.sql.catalyst.expressions.StaticInvoke,name:arg uments,type:interface scala.collection.Seq) - object(class org.apache.spark.sql.catalyst.expressions.StaticInvoke,staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData $,ObjectType(interface) scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true), - field(class:“scala.collection.immutable.Map”,name:“map”) ), - root class:“collector.MyMap”),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast( 'map,MapType(StringType,StringType,true), - field(类:“scala.collection.immutable.Map”,名称:“map”), - root class:“collector.MyMap”),valueArray,ArrayType(StringType ,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true)) - writeObject data(class:scala.collection.immutable.List $ SerializationProxy) - object(class scala.collection . immutable.List $ SerializationProxy,scala.collection.immutable.List$SerializationProxy@377795c5) - writeRep lace data(类:scala.collection.immutable.List $ SerializationProxy) - object(类scala.collection.immutable . $冒号$冒号,List(staticinvoke(类org.apache.spark.sql.catalyst.util.ArrayBasedMapData $, ObjectType(接口scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true), - field(class:“scala.collection.immutable.Map”,name: “map”), - root class:“collector.MyMap”),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),invoke(mapobjects(,invoke) (upcast('map,MapType(StringType,StringType,true), - field(class:“scala.collection.immutable.Map”,name:“map”), - root class:“collector.MyMap”),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object;)),true))) - field(class:org.apache.spark . sql.catalyst.expressions.NewInstance,name:arguments,type:interface scala.collection.Seq) - object(class org.apache.spark.sql.catalyst.expressions.NewInstance,newinstance(class collector.MyMap,staticinvoke(class org) .apache.spark.sql.catalyst.util.ArrayBasedMapData $,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true), - field( class:“scala.collection.immutable.Map”,name:“map”), - root class:“collector.MyMap”),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava] .lang.Object;)),调用(mapobjects(,invoke)(upcast('map,MapType(StringType,StringType,true), - field(class:“scala.collection.immutable.Map”,name:“map”) , - root类:“collector.MyMap”),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang] .Object;)),true),false,ObjectType(类collector.MyMap),None)) - field(类:org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,name:fromRowExpression,type:class org . apache.spark.sql.catalyst.expressions.Expression) - object(类org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,类[map #ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30):map] ) - field(类:org.apache.spark.sql.execution.MapPartitions,name:uEncoder,type:class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder) - object(class org.apache.spark.sql .execution.MapPartitions,!MapPartitions,类[a [0]:string,b [0]:string],类[map #ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30):map],[map#13 ] - LocalTableScan [a#2,b#3],[[0,180000000a,2800000005,2d35302d35313032,3130,3161746164],[0,180000000a,2800000005,2d35302d35313032,3130,3261746164]] - 字段(类:org . apache.spark.sql.execution.MapPartitions $$ anonfun $ 8,name:$ outer,type:class org.apache.spark.sql.execution.MapPartitions) - object(类org.apache.spark.sql.execution.MapPartitions $$ anonfun $ 8,) - field(类:org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1,name:f $ 22,类型:interface scala .Function1) - object(类org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1,) - field(类:org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 21 ,name:$ outer,type:class org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1) - object(class org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 21 ,) - field(类:org.apache.spark.rdd.MapPartitionsRDD,name:f,类型:接口scala.Function3) - 在CollectorSparkTest展示的对象(类org.apache.spark.rdd.MapPartitionsRDD,MapPartitionsRDD [1] .scala:50) - field(类:org.apache.spark.NarrowDependency,name:rdd,type:class org.apache.spark.rdd.RDD) - object(class org.apache.spark.OneToOneDependency,org.apache .spark.OneToOneDependency @ 110f15b7) - writeObject数据(类:scala.collection.im mutable.List $ SerializationProxy) - object(类scala.collection.immutable.List $ SerializationProxy,scala.collection.immutable.List$SerializationProxy@6bb23696) - writeReplace data(类:scala.collection.immutable.List $ SerializationProxy) - 对象(类scala.collection.immutable . $冒号$冒号,列表(org.apache.spark.OneToOneDependency@110f15b7)) - 字段(类:org.apache.spark.rdd.RDD,名称:org $ apache $ spark $ rdd $ RDD $$ dependencies,type:interface scala.collection.Seq) - object(class org.apache.spark.rdd.MapPartitionsRDD,MapPartitionsRDD [2] at CollectorSparkTest.scala:50) - field(class:scala.Tuple2 ,name:_1,type:class java.lang.Object) - object(class scala.Tuple2,(MapPartitionsRDD [2] at show at CollectorSparkTest.scala:50,))at org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1431)org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1419)at org.apach e.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1418)at scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)at scala.collection.mutable.ArrayBuffer . foreach(ArrayBuffer.scala:48)org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1010)org.apache .spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ submitStage(DAGScheduler.scala:921)atg.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)org.apache . 位于org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)的org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:)中的spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607): 1588)在org.apache.spark.util.EventLoop $$ anon $ 1.run(EventLoop.scala:48)at atorg.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)org.apache.spark.SparkContext.runJob(SparkContext.scala: 1845)org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)atg.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)at org.apache.spark.sql.execution .Limit.executeCollect(basicOperators.scala:165)org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)at org.apache.spark.sql.DataFrame $$ anonfun $ org $ apache $ spark $ sql $ DataFrame $$执行$ 1 $ 1.apply(DataFrame.scala:1538)org.apache.spark.sql.DataFrame $$ anonfun $ org $ apache $ spark $ sql $ DataFrame $$执行$ 1 $ 1.apply( DataFrame.scala:1538)org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:56)atg.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)at org . apache.spark.sql.DataFrame.org $ $阿帕奇火花$ $ SQL数据帧$$执行$ 1(DataFrame.scala:153 7)在org.apache.spark.sql.DataFrame.org $ apache $ spark $ sql $ DataFrame $$ collect(DataFrame.scala:1544)at org.apache.spark.sql.DataFrame $$ anonfun $ head $ 1.apply (DataFrame.scala:1414)org.apache.spark.sql.DataFrame $$ anonfun $ head $ 1.apply(DataFrame.scala:1413)at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) )org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413)atg.apache.spark.sql.DataFrame.take(DataFrame.scala:1495)atg.apache.spark.sql.DataFrame . showgtring(DataFrame.scala:171)org.apache.spark.sql.DataFrame.show(DataFrame.scala:394)org.apache.spark.sql.Dataset.show(Dataset.scala:228)atg.apache org.apache.spark.sql.Dataset.show上的.spark.sql.Dataset.show(Dataset.scala:192)(Dataset.scala:200)

2 回答

  • 2

    问题是scala Map类不可序列化,因此Dataset API无法自动生成适当的编码器 . 我建议将 Map 转换为字符串,然后解析字符串并转换回 Map (假设您在 Map 中存储字符串) .

    数据集API可能也不是最佳选择 . 我写了this article,这可能是有意义的 .

  • 1

    我想你可能实际上正在击中SPARK-12696,这是在spark / master中修复的 . 我希望在不久的将来发布1.6.1,其中应该包括这个补丁 .

相关问题