首页 文章

将UDF应用于Spark DF中的列,并根据列更改函数

提问于
浏览
0

我有一个Spark数据帧,其中包含我要映射到数字数据的字符串数据,如下所示(简单版本):

+--------------------+-------+----------+-------------------------+
|     participantUUID|001_Age|002_Gender|003_Where did you grow up|
+--------------------+-------+----------+-------------------------+
|010A0550-4324-490...|     23|    Female|                In a town|
|031C5411-FE42-429...|     56|      Male|                In a town|
|038688FF-B5DA-484...|     32|    Female|                In a town|
|05F8E1AF-AFDD-441...|     54|    Female|          Multiple places|
|068B213C-3303-41E...|     23|    Female|                In a town|
|11A9A444-3E93-468...|     39|    Female|                In a town|

有许多列,而不是逐列应用映射,我想在整个数据帧中逐列应用映射 .

从字符串到数字的映射因列而异 . 例如,对于一列,字符串“差”,“公平”,“好”,“非常好”将获得1,2,3,4的分数;对于另一列,分数可能是4,3,2,1 . 所以,我想开发一个udf,它将列 Headers 和字符串值作为参数,然后根据dataframe列应用Foldleft函数,如下所示:

val calculateScore = udf((columnName: String, answerText: String) => (columnName, answerText) match {

      case ("002_Gender", "Female") => 0
      case ("002_Gender", "Male") => 1
      case ("002_Gender", "Other") => 2

      case ("003_Where did you grow up", "In a village") => 0 
      case ("003_Where did you grow up", "In a town") => 1
      case ("003_Where did you grow up", "Multiple places") => 2
      case _ => -1
    })

val columnNames = Seq("001_Age", "002_Gender", "003_Where did you grow up")

val newDF: DataFrame = columnNames.foldLeft(baseDF)(
      (baseDF, c) =>
        baseDF.withColumn(c.concat("_numeric"), calculateScore(baseDF(c), baseDF(c)))
    )

但是,这并没有返回正确的结果 - 所有结果显示为-1,这意味着udf未正确匹配:

+--------------------+----------------+----------+------------------+-------------------------+---------------------------------+
|     participantUUID|assessmentNumber|002_Gender|002_Gender_numeric|003_Where did you grow up|003_Where did you grow up_numeric|
+--------------------+----------------+----------+------------------+-------------------------+---------------------------------+
|010A0550-4324-490...|               0|    Female|                -1|                In a town|                               -1|
|031C5411-FE42-429...|               0|      Male|                -1|                In a town|                               -1|
|038688FF-B5DA-484...|               0|    Female|                -1|                In a town|                               -1|
|05F8E1AF-AFDD-441...|               0|    Female|                -1|          Multiple places|                               -1|
|068B213C-3303-41E...|               0|    Female|                -1|                In a town|                               -1|

我认为这是由于 calculateScore udf语句的语法,它应该获取字符串列名和答案文本并返回一个int,在列中逐行评估 . 换句话说,foldLeft语句的格式为:

val newDF: DataFrame = columnNames.foldLeft[DataFrame](baseDF)(
      (acc, c) =>
        acc.withColumn(c, col(c))
    )

所以 calculateScore(baseDF(c), baseDF(c)) 应该返回一个Column类型的对象 - 但显然出现了问题 .

任何想法将非常感谢,谢谢!

NB . 我已经回顾了:Apply UDF to multiple columns in Spark Dataframe但我不喜欢使用var DF的想法,因为在我看来这违反了Scala中不可变编程的原则!

2 回答

  • 0
    var baseDF=Seq(("Female","In a town"),("Male","Multiple places")).toDF("002_Gender","003_Where did you grow up")
     baseDF.show
    +----------+-------------------------+
    |002_Gender|003_Where did you grow up|
    +----------+-------------------------+
    |    Female|                In a town|
    |      Male|          Multiple places|
    +----------+-------------------------+
    
    def calculateScore(columnName: String) = udf((answerText: String) => (columnName, answerText) match {
    
      case ("002_Gender", "Female") => 0
      case ("002_Gender", "Male") => 1
      case ("002_Gender", "Other") => 2
    
      case ("003_Where did you grow up", "In a village") => 0 
      case ("003_Where did you grow up", "In a town") => 1
      case ("003_Where did you grow up", "Multiple places") => 2
      case _ => -1
    })
    
    val columnNames = Seq("002_Gender", "003_Where did you grow up")
    
    val newDF = columnNames.foldLeft(baseDF)(
        (baseDF, c) =>
          baseDF.withColumn(c.concat("_numeric"), calculateScore(c)(baseDF(c)))
       )
     newDF.show
    
  • 0

    您将完全相同的参数传递给UDF,因此列值作为两个参数传递,并且与默认值 case _ 匹配

    您需要传递 lit(c) 作为第一个参数 .

    df.show
    +----------+-------------------------+
    |002_Gender|003_Where did you grow up|
    +----------+-------------------------+
    |    Female|                In a town|
    |      Male|          Multiple places|
    +----------+-------------------------+
    
    columnNames.foldLeft(df)( (df,c) => df.withColumn(c.concat("_numeric") , calculateScore(lit(c) , df(c) ) ) ).show(false)
    
    +----------+-------------------------+------------------+---------------------------------+
    |002_Gender|003_Where did you grow up|002_Gender_numeric|003_Where did you grow up_numeric|
    +----------+-------------------------+------------------+---------------------------------+
    |Female    |In a town                |0                 |1                                |
    |Male      |Multiple places          |1                 |2                                |
    +----------+-------------------------+------------------+---------------------------------+
    

相关问题