首页 文章

如何将一个数据框中的编辑合并到Spark中的另一个数据框中?

提问于
浏览
0

我有一个包含150列和多行的数据帧 df1 . 我还有一个数据框 df2 具有相同的模式,但很少有行包含应该应用于 df1 的编辑(有一个键列ID来标识要更新的行) . df2 仅包含已填充更新的列 . 其他列为null . 我想要做的是通过以下方式更新 df1 中的行与dataframe df2 中的对应行:

  • 如果 df2 中的列为空,则不应导致 df1 中的任何更改

  • 如果 df2 中的列包含波形符"~",则应该导致 df1 中的该列无效

  • 否则 df1 中的列中的值应替换为 df2 中的值

我该怎么做才能做到最好?是否可以通用的方式完成而不列出所有列,而是迭代它们?可以使用dataframe API完成,还是需要切换到RDD?

(当然,通过更新数据帧df1,我的意思是创建一个新的,更新的数据帧 . )

示例

假设模式是:id:Int,name:String,age:Int .

df1 是:

1,"Greg",18
2,"Kate",25
3,"Chris",30

df2 是:

1,"Gregory",null
2,~,26

updated dataframe 应如下所示:

1,"Gregory",18
2,null,26
3,"Chris",30

2 回答

  • 0

    我想通过中间转换到RDD来实现它 . 首先,创建一个映射idsToEdits,其中键是行ID,值是列号到值的映射(只有非空值) .

    val idsToEdits=df2.rdd.map{row=>
      (row(0),
       row.getValuesMap[AnyVal](row.schema.fieldNames.filterNot(colName=>row.isNullAt(row.fieldIndex(colName))))
      .map{case (k,v)=> (row.fieldIndex(k),if(v=="~") null else v)} )
    }.collectAsMap()
    

    Broadast,用于映射和定义更新行的editRow函数 .

    val idsToEditsBr=sc.broadcast(idsToEdits)
    import org.apache.spark.sql.Row
    val editRow:Row=>Row={ row =>
      idsToEditsBr
        .value
        .get(row(0))
        .map{edits => Row.fromSeq(edits.foldLeft(row.toSeq){case (rowSeq,
    (idx,newValue))=>rowSeq.updated(idx,newValue)})}
        .getOrElse(row)
    }
    

    最后,在源自df1的RDD上使用该函数并转换回数据帧 .

    val updatedDF=spark.createDataFrame(df1.rdd.map(editRow),df1.schema)
    
  • 0

    听起来你的问题是如何在没有明确命名所有列的情况下执行此操作,因此我假设你有一些“doLogic”udf函数或数据帧函数来加入后执行你的逻辑 .

    import org.apache.spark.sql.types.StringType
    
    val cols = df1.schema.filterNot(x => x.name == "id").map({ x =>
        if (x.dataType == StringType) {
            doLogicUdf(col(x), col(x + "2"))) 
        } else {
            when(col(x + "2").isNotNull, col(x + "2")).otherwise(col(x))
        }
    }) :+ col("id")
    val df2 = df2.select(df2.columns.map( x=> col(x).alias(x+"2")) : _*)) 
    df1.join(df2, col("id") ===col("id2") , "inner").select(cols : _*)
    

相关问题