首页 文章

如何在SparkSql中将额外参数传递给UDF?

提问于
浏览
12

我想解析 DataFrame 中的日期列,并且对于每个日期列,日期的分辨率可能会更改(例如,如果分辨率设置为"Month",则为2011/01/10 => 2011/01) .

我写了以下代码:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
  import org.apache.spark.sql.functions._
  val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
  val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}

  val allColNames = dataframe.columns
  val allCols = allColNames.map(name => dataframe.col(name))

  val mappedCols =
  {
    for(i <- allCols.indices) yield
    {
      schema(i) match
      {
        case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
        case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
        case _ => allCols(i)
      }
    }
  }

  dataframe.select(mappedCols:_*)

}}

但它不起作用 . 似乎我只能将 Column 传递给UDF . 如果我将 DataFrame 转换为 RDD 并在每一行上应用该函数,我想知道它是否会非常慢 .

有谁知道正确的解决方案?谢谢!

2 回答

  • 31

    只需使用一点点currying:

    def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => 
      SparkDateTimeConverter.convertDate(x, resolution))
    

    并按如下方式使用:

    case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))
    

    另外,你应该看一下 sql.functions.truncsql.functions.date_format . 这些至少应该是部分工作而根本不使用UDF .

    Note

    在Spark 2.2或更高版本中,您可以使用 typedLit 函数:

    import org.apache.spark.sql.functions.typedLit
    

    它支持更广泛的文字,如 SeqMap .

  • 12

    您可以使用 org.apache.spark.sql.functions 中定义的 lit(...) 函数创建文字 Column 以传递给udf

    例如:

    val takeRight = udf((s: String, i: Int) => s.takeRight(i))
    df.select(takeRight($"stringCol", lit(1)))
    

相关问题