val test = myDF.select("my_column").rdd.map(r => getTimestamp(r))
但这会返回一个RDD,只返回已转换的列 .
1 回答
36
如果你真的需要使用你的功能,我可以建议两个选项:
1)使用map / toDF:
import org.apache.spark.sql.Row
import sqlContext.implicits._
def getTimestamp: (String => java.sql.Timestamp) = // your function here
val test = myDF.select("my_column").rdd.map {
case Row(string_val: String) => (string_val, getTimestamp(string_val))
}.toDF("my_column", "new_column")
2)使用UDF( UserDefinedFunction ):
import org.apache.spark.sql.functions._
def getTimestamp: (String => java.sql.Timestamp) = // your function here
val newCol = udf(getTimestamp).apply(col("my_column")) // creates the new column
val test = myDF.withColumn("new_column", newCol) // adds the new column to original DF
1 回答
如果你真的需要使用你的功能,我可以建议两个选项:
1)使用map / toDF:
2)使用UDF(
UserDefinedFunction
):有关this nice article by Bill Chambers中Spark SQL UDF的更多详细信息 .
Alternatively ,
如果您只想将
StringType
列转换为TimestampType
列,则可以使用自Spark SQL 1.5以来可用的unix_timestamp
column function:注意:对于spark 1.5.x,在转换为timestamp(issue SPARK-11724)之前,必须将
unix_timestamp
的结果乘以1000
. 结果代码是:编辑:添加了udf选项