首页 文章

spark连接数据帧和合并模式

提问于
浏览
2

我在spark中有几个数据框,在开始时部分类似的模式( Headers ),最后是不同的列(自定义) .

case class First(header1:String, header2:String, header3:Int, custom1:String)
case class Second(header1:String, header2:String, header3:Int, custom1:String, custom5:String)
case class Third(header1:String, header2:String, header3:Int, custom2:String, custom3:Int, custom4:Double)

val first = Seq(First("A", "Ba1", 1, "custom1"), First("A", "Ba2", 2, "custom2")).toDS
val second = Seq(Second("B", "Bb1", 1, "custom12", "custom5"), Second("B", "Bb2", 22, "custom12", "custom55")).toDS
val third = Seq(Third("A", "Bc1", 1, "custom2", 22, 44.4)).toDS

这可能看起来像:

+-------+-------+-------+-------+
|header1|header2|header3|custom1|
+-------+-------+-------+-------+
|      A|    Ba1|      1|custom1|
|      A|    Ba2|      2|custom2|
+-------+-------+-------+-------+


+-------+-------+-------+--------+--------+
|header1|header2|header3| custom1| custom5|
+-------+-------+-------+--------+--------+
|      B|    Bb1|      1|custom12| custom5|
|      B|    Bb2|     22|custom12|custom55|
+-------+-------+-------+--------+--------+


+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+
|      A|    Bc1|      1|custom2|     22|   44.4|
+-------+-------+-------+-------+-------+-------+

How can I merge the schema to basically concatenate all the dataframes into a single schema

case class All(header1:String, header2:String, header3:Int, custom1:Option[String], custom3:Option[String],
                custom4: Option[Double], custom5:Option[String], type:String)

哪些不存在的列可以为空?

如果第一个记录来自名为first的数据框,则输出应该如下所示

+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom2|custom3|custom4|custom5|
+-------+-------+-------+-------+-------+-------+-------+-------+
|      A|      B|      1|custom1|Nan    |Nan    |    Nan|  Nan. |
+-------+-------+-------+-------+-------+-------+-------+-------+

我正在考虑通过 Headers 列加入数据帧,但是,只有一些(比如header1)将保持相同(实际可连接)的值而其他(header2,3)将保持不同的值,即

first
    .join(second, Seq("header1", "header2", "header3"), "LEFT")
    .join(third, Seq("header1", "header2", "header3"), "LEFT")
  .show

导致

+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom1|custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|      A|    Ba1|      1|custom1|   null|   null|   null|   null|   null|
|      A|    Ba2|      2|custom2|   null|   null|   null|   null|   null|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+

是不正确的,因为我只想 pd.Concat(axis=0) dataFrames,即缺少大部分记录 . 此外,它缺少一个识别原始数据框的 type 列,即 first, second, third

编辑

我认为经典的全外连接是解决方案

first
    .join(second, Seq("header1", "header2", "header3"), "fullouter")
    .join(third, Seq("header1", "header2", "header3"), "fullouter")
  .show

收益率:

+-------+-------+-------+-------+--------+--------+-------+-------+-------+
|header1|header2|header3|custom1| custom1| custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
|      A|    Ba1|      1|custom1|    null|    null|   null|   null|   null|
|      A|    Ba2|      2|custom2|    null|    null|   null|   null|   null|
|      A|    Bb1|      1|   null|custom12| custom5|   null|   null|   null|
|      A|    Bb2|     22|   null|custom12|custom55|   null|   null|   null|
|      A|    Bc1|      1|   null|    null|    null|custom2|     22|   44.4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+

如您所见,实际上永远不会有真正的连接,行是连接的 . 是否有更简单的操作来实现相同的功能?这个答案不是最佳的,因为 custom1 是一个重复的名称 . 我宁愿想看到一个 custom1 列(如果有第二个要填充,则没有空值) .

2 回答

  • 2

    看看我的comment to similar question . 基本上你需要结合所有的框架 . 要制作类似的架构,您需要使用 dataframe.withColumn(ColumnName, expr("null")) 表达式:

    import org.apache.spark.sql.functions._  
    val first1 = first.withColumn("custom5", expr("null"))
                      .withColumn("custom4", expr("null"))
    val second2 = second.withColumn("custom4", expr("null"))
    val result = first1.unionAll(second2).unionAll(third)
    
  • 1

    如果它提供了所需的结果,请测试SQL Union方法 .

    SELECT header1,
           header2,
           header3,
           custom1,
           To_char(NULL)   "custom2",
           To_char(NULL)   "custom3",
           To_number(NULL) "custom4",
           To_char(NULL)   "custom5"
    FROM   table1
    UNION
    SELECT header1,
           header2,
           header3,
           custom1,
           To_char(NULL)   "custom2",
           To_char(NULL)   "custom3",
           To_number(NULL) "custom4",
           custom5
    FROM   table2
    UNION
    SELECT header1,
           header2,
           header3,
           To_char(NULL) "custom1",
           custom2,
           custom3,
           custom4,
           To_char(NULL) "custom5"
    FROM   table3;
    

相关问题