首页 文章

如何优雅地将多列行转换为数据帧?

提问于
浏览
2

我想使用StructType将rdd转换为dataframe . 但项目 "Broken,Line," 会导致错误 . 是否有一种优雅的方式来处理这样的记录?谢谢 .

import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.Row

val mySchema = StructType(Array(
  StructField("colA", StringType, true),
  StructField("colB", StringType, true),
  StructField("colC", StringType, true)))
val x = List("97573,Start,eee", "9713,END,Good", "Broken,Line,")
val inputx = sc.parallelize(x).
| map((x:String) => Row.fromSeq(x.split(",").slice(0,mySchema.size).toSeq))
val df = spark.createDataFrame(inputx, mySchema)
df.show

错误是这样的:

名称:org.apache.spark.SparkException消息:作业因阶段失败而中止:阶段14.0中的任务0失败1次,最近失败:阶段14.0中丢失的任务0.0(TID 14,localhost, Actuator 驱动程序):java . lang.RuntimeException:编码时出错:java.lang.ArrayIndexOutOfBoundsException:2

我正在使用: Spark: 2.2.0
Scala: 2.11.8

我在 Spark-shell 中运行了代码 .

1 回答

  • 2

    我们应用您的架构的 Row.fromSeq 会引发您获得的错误 . 列表中的第三个元素仅包含2个元素 . 除非添加空值而不是缺少值,否则无法将其转换为具有三个元素的行 .

    在创建DataFrame时,Spark期望每行使用3个元素来应用模式,从而产生错误 .

    一个快速而肮脏的解决方案是使用 scala.util.Try 分别获取字段:

    import org.apache.spark.sql.types.{StructType, StructField, StringType}
    import org.apache.spark.sql.Row
    import scala.util.Try
    
    val mySchema = StructType(Array(StructField("colA", StringType, true), StructField("colB", StringType, true), StructField("colC", StringType, true)))
    
    val l = List("97573,Start,eee", "9713,END,Good", "Broken,Line,")
    
    val rdd = sc.parallelize(l).map {
     x => {
      val fields = x.split(",").slice(0, mySchema.size)
      val f1 = Try(fields(0)).getOrElse("")
      val f2 = Try(fields(1)).getOrElse("")
      val f3 = Try(fields(2)).getOrElse("")
      Row(f1, f2, f3)
     }
    }
    
    val df = spark.createDataFrame(rdd, mySchema)
    
    df.show
    // +------+-----+----+
    // |  colA| colB|colC|
    // +------+-----+----+
    // | 97573|Start| eee|
    // |  9713|  END|Good|
    // |Broken| Line|    |
    // +------+-----+----+
    

    我不会像你问过的那样优雅的解决方案 . 解析字符串永远不会优雅!您应该使用 csv 源正确读取它(或 spark-csv 表示<2.x) .

相关问题