我在spark中有几个数据框,在开始时部分类似的模式( Headers ),最后是不同的列(自定义) .
case class First(header1:String, header2:String, header3:Int, custom1:String)
case class Second(header1:String, header2:String, header3:Int, custom1:String, custom5:String)
case class Third(header1:String, header2:String, header3:Int, custom2:String, custom3:Int, custom4:Double)
val first = Seq(First("A", "Ba1", 1, "custom1"), First("A", "Ba2", 2, "custom2")).toDS
val second = Seq(Second("B", "Bb1", 1, "custom12", "custom5"), Second("B", "Bb2", 22, "custom12", "custom55")).toDS
val third = Seq(Third("A", "Bc1", 1, "custom2", 22, 44.4)).toDS
这可能看起来像:
+-------+-------+-------+-------+
|header1|header2|header3|custom1|
+-------+-------+-------+-------+
| A| Ba1| 1|custom1|
| A| Ba2| 2|custom2|
+-------+-------+-------+-------+
+-------+-------+-------+--------+--------+
|header1|header2|header3| custom1| custom5|
+-------+-------+-------+--------+--------+
| B| Bb1| 1|custom12| custom5|
| B| Bb2| 22|custom12|custom55|
+-------+-------+-------+--------+--------+
+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+
| A| Bc1| 1|custom2| 22| 44.4|
+-------+-------+-------+-------+-------+-------+
How can I merge the schema to basically concatenate all the dataframes into a single schema
case class All(header1:String, header2:String, header3:Int, custom1:Option[String], custom3:Option[String],
custom4: Option[Double], custom5:Option[String], type:String)
哪些不存在的列可以为空?
如果第一个记录来自名为first的数据框,则输出应该如下所示
+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom2|custom3|custom4|custom5|
+-------+-------+-------+-------+-------+-------+-------+-------+
| A| B| 1|custom1|Nan |Nan | Nan| Nan. |
+-------+-------+-------+-------+-------+-------+-------+-------+
我正在考虑通过 Headers 列加入数据帧,但是,只有一些(比如header1)将保持相同(实际可连接)的值而其他(header2,3)将保持不同的值,即
first
.join(second, Seq("header1", "header2", "header3"), "LEFT")
.join(third, Seq("header1", "header2", "header3"), "LEFT")
.show
导致
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom1|custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| A| Ba1| 1|custom1| null| null| null| null| null|
| A| Ba2| 2|custom2| null| null| null| null| null|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
是不正确的,因为我只想 pd.Concat(axis=0)
dataFrames,即缺少大部分记录 . 此外,它缺少一个识别原始数据框的 type
列,即 first, second, third
编辑
我认为经典的全外连接是解决方案
first
.join(second, Seq("header1", "header2", "header3"), "fullouter")
.join(third, Seq("header1", "header2", "header3"), "fullouter")
.show
收益率:
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
|header1|header2|header3|custom1| custom1| custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
| A| Ba1| 1|custom1| null| null| null| null| null|
| A| Ba2| 2|custom2| null| null| null| null| null|
| A| Bb1| 1| null|custom12| custom5| null| null| null|
| A| Bb2| 22| null|custom12|custom55| null| null| null|
| A| Bc1| 1| null| null| null|custom2| 22| 44.4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
如您所见,实际上永远不会有真正的连接,行是连接的 . 是否有更简单的操作来实现相同的功能?这个答案不是最佳的,因为 custom1
是一个重复的名称 . 我宁愿想看到一个 custom1
列(如果有第二个要填充,则没有空值) .
2 回答
看看我的comment to similar question . 基本上你需要结合所有的框架 . 要制作类似的架构,您需要使用
dataframe.withColumn(ColumnName, expr("null"))
表达式:如果它提供了所需的结果,请测试SQL Union方法 .