我想解析 DataFrame
中的日期列,并且对于每个日期列,日期的分辨率可能会更改(例如,如果分辨率设置为"Month",则为2011/01/10 => 2011/01) .
我写了以下代码:
def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
import org.apache.spark.sql.functions._
val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}
val allColNames = dataframe.columns
val allCols = allColNames.map(name => dataframe.col(name))
val mappedCols =
{
for(i <- allCols.indices) yield
{
schema(i) match
{
case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
case _ => allCols(i)
}
}
}
dataframe.select(mappedCols:_*)
}}
但它不起作用 . 似乎我只能将 Column
传递给UDF . 如果我将 DataFrame
转换为 RDD
并在每一行上应用该函数,我想知道它是否会非常慢 .
有谁知道正确的解决方案?谢谢!
2 回答
只需使用一点点currying:
并按如下方式使用:
另外,你应该看一下
sql.functions.trunc
和sql.functions.date_format
. 这些至少应该是部分工作而根本不使用UDF .Note :
在Spark 2.2或更高版本中,您可以使用
typedLit
函数:它支持更广泛的文字,如
Seq
或Map
.您可以使用
org.apache.spark.sql.functions
中定义的lit(...)
函数创建文字Column
以传递给udf例如: