首页 文章

Spark Scala:如何转换DF中的列

提问于
浏览 781
21

我在Spark中有一个数据框,有很多列和我定义的udf . 我希望返回相同的数据帧,除非转换了一列 . 此外,我的udf接受一个字符串并返回一个时间戳 . 是否有捷径可寻?我试过了

val test = myDF.select("my_column").rdd.map(r => getTimestamp(r))

但这会返回一个RDD,只返回已转换的列 .

1 回答

  • 36

    如果你真的需要使用你的功能,我可以建议两个选项:

    1)使用map / toDF:

    import org.apache.spark.sql.Row
    import sqlContext.implicits._
    
    def getTimestamp: (String => java.sql.Timestamp) = // your function here
    
    val test = myDF.select("my_column").rdd.map {
      case Row(string_val: String) => (string_val, getTimestamp(string_val))
    }.toDF("my_column", "new_column")
    

    2)使用UDF( UserDefinedFunction ):

    import org.apache.spark.sql.functions._
    
    def getTimestamp: (String => java.sql.Timestamp) = // your function here
    
    val newCol = udf(getTimestamp).apply(col("my_column")) // creates the new column
    val test = myDF.withColumn("new_column", newCol) // adds the new column to original DF
    

    有关this nice article by Bill Chambers中Spark SQL UDF的更多详细信息 .


    Alternatively

    如果您只想将 StringType 列转换为 TimestampType 列,则可以使用自Spark SQL 1.5以来可用的 unix_timestamp column function

    val test = myDF
      .withColumn("new_column", unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm").cast("timestamp"))
    

    注意:对于spark 1.5.x,在转换为timestamp(issue SPARK-11724)之前,必须将 unix_timestamp 的结果乘以 1000 . 结果代码是:

    val test = myDF
      .withColumn("new_column", (unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm") *1000L).cast("timestamp"))
    

    编辑:添加了udf选项

相关问题