我为它创建了一个JSON数据和一个Avro架构:
{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp": 1366154481 }
和
{“type”:“record”,“name”:“twitter_schema”,“namespace”:“com.miguno.avro”,“fields”:[{“name”:“username”,“type”:“string” “,”doc“:”Twitter.com上用户帐户的名称“},{”name“:”tweet“,”type“:”string“,”doc“:”用户Twitter消息的内容“} ,{“name”:“timestamp”,“type”:“long”,“doc”:“Unix纪元时间以秒为单位”},“doc:”:“存储Twitter消息的基本架构”}
然后我将其转换为Avro,如下所示:
java -jar ~/avro-tools-1.7.4.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro
将文件放在hdfs上:
hadoop fs -copyFromLocal twitter.avro <path>
然后在Spark CLI中我发布了代码:
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable
val path = "hdfs:///path/to/your/avro/folder"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
但是在做的时候:
avroRDD.first
我面临以下例外:
org.apache.spark.SparkException:作业因阶段失败而中止:阶段7.0(TID 13)中的任务2.0有一个不可序列化的结果:org.apache.spark.scheduler.DAGScheduler上的org.apache.avro.mapred.AvroWrapper .org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1185)org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1174)org.apache . spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1173)scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)at scala.collection.mutable.ArrayBuffer.foreach( ArrayBuffer.scala:47)
它有什么解决方案?
1 回答
Spark正在尝试服务/删除你的avro数据,但它不是“java serializable”(默认的ser . 用于spark) .
你有几个选择:
从包装器中提取通用记录,并将每个记录映射到某个可序列化的结构
生成特定的记录类并为它们设置deser而不是通用记录(您仍需要从包装器中提取记录)
启用kryo序列化(这可以在 some cases only 中使用)
请注意,记录在内部重复使用,因此如果您执行rdd.collect,则最终会得到具有相同值的所有记录 . 在执行收集之前将原始输入数据映射到其他内容可以在执行复制时解决问题 .