我想使用StructType将rdd转换为dataframe . 但项目 "Broken,Line,"
会导致错误 . 是否有一种优雅的方式来处理这样的记录?谢谢 .
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.Row
val mySchema = StructType(Array(
StructField("colA", StringType, true),
StructField("colB", StringType, true),
StructField("colC", StringType, true)))
val x = List("97573,Start,eee", "9713,END,Good", "Broken,Line,")
val inputx = sc.parallelize(x).
| map((x:String) => Row.fromSeq(x.split(",").slice(0,mySchema.size).toSeq))
val df = spark.createDataFrame(inputx, mySchema)
df.show
错误是这样的:
名称:org.apache.spark.SparkException消息:作业因阶段失败而中止:阶段14.0中的任务0失败1次,最近失败:阶段14.0中丢失的任务0.0(TID 14,localhost, Actuator 驱动程序):java . lang.RuntimeException:编码时出错:java.lang.ArrayIndexOutOfBoundsException:2
我正在使用: Spark: 2.2.0
Scala: 2.11.8
我在 Spark-shell 中运行了代码 .
1 回答
我们应用您的架构的
Row.fromSeq
会引发您获得的错误 . 列表中的第三个元素仅包含2个元素 . 除非添加空值而不是缺少值,否则无法将其转换为具有三个元素的行 .在创建DataFrame时,Spark期望每行使用3个元素来应用模式,从而产生错误 .
一个快速而肮脏的解决方案是使用
scala.util.Try
分别获取字段:我不会像你问过的那样优雅的解决方案 . 解析字符串永远不会优雅!您应该使用
csv
源正确读取它(或spark-csv
表示<2.x) .