我正在使用Apache Spark来生成镶木地板文件 . 我可以按日期对它们进行分区而没有任何问题,但在内部我似乎无法以正确的顺序布置数据 .
订单似乎在处理期间丢失,这意味着镶木地板元数据不正确(具体来说,我希望确保镶木地板行组反映排序顺序,以便特定于我的用例的查询可以通过元数据有效过滤) .
请考虑以下示例:
// note: hbase source is a registered temp table generated from hbase
val transformed = sqlContext.sql(s"SELECT id, sampleTime, ... ,
toDate(sampleTime) as date FROM hbaseSource")
// Repartion the input set by the date column ( in my source there should be
2 distinct dates)
val sorted = transformed.repartition($"date").sortWithinPartitions("id",
"sampleTime")
sorted.coalesce(1).write.partitionBy("date").parquet(s"/outputFiles")
通过这种方法,我得到了正确的镶木地板分区结构(按日期) . 甚至更好,对于每个日期分区,我看到一个大的镶木地板文件 .
/outputFiles/date=2018-01-01/part-00000-4f14286c-6e2c-464a-bd96-612178868263.snappy.parquet
但是,当我查询文件时,我看到的内容不按顺序 . 具体而言,“乱序”似乎更像是几个有序的数据帧分区已合并到文件中 .
镶木地板行组元数据显示已排序的字段实际上是重叠的(例如,特定的id可能位于许多行组中):
id: :[min: 54, max: 65012, num_nulls: 0]
sampleTime: :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id: :[min: 827, max: 65470, num_nulls: 0]
sampleTime: :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id: :[min: 1629, max: 61412, num_nulls: 0]
我希望在每个文件中正确排序数据,因此每个行组中的元数据最小值/最大值不重叠 .
例如,这是我想看到的模式:
RG 0: id: :[min: 54, max: 100, num_nulls: 0]
RG 1: id: :[min: 100, max: 200, num_nulls: 0]
...其中RG =“行组” . 如果我想要id = 75,查询可以在一个行组中找到它 .
我已尝试过上述代码的许多变体 . 例如,有和没有 coalesce
(我知道合并是坏的,但我的想法是用它来防止洗牌) . 我也尝试过 sort
而不是 sortWithinPartitions
(sort应该创建一个总排序,但会产生很多分区) . 例如:
val sorted = transformed.repartition($"date").sort("id", "sampleTime")
sorted.write.partitionBy("date").parquet(s"/outputFiles")
给了我200个文件,这个文件太多了,但它们仍然没有正确排序 . 我可以通过调整shuffle大小来减少文件数量,但我希望在写入期间按顺序处理排序(我的印象是写入没有改变输入) . 我看到的顺序如下(为简洁起见省略了其他字段):
+----------+----------------+
|id| sampleTime|
+----------+----------------+
| 56868|1514840220000000|
| 57834|1514785180000000|
| 56868|1514840220000000|
| 57834|1514785180000000|
| 56868|1514840220000000|
看起来像是交错排序的分区 . 所以我认为 repartition
在这里买不到任何东西,并且 sort
似乎无法保持写入步骤的顺序 .
我甚至尝试过Ryan Blue在演示文稿"Parquet performance tuning: The missing guide"中概述的方法(不幸的是它落后于OReily付费专区) . 这涉及到使用 insertInto
. 在那种情况下,spark似乎使用了旧版本的parquet-mr,它破坏了元数据,我不知道如何升级它 .
我不确定我做错了什么 . 我的感觉是我误解了 repartition($"date")
和 sort
工作和/或互动的方式 .
我很感激任何想法 . 为论文道歉 . :)
编辑:另请注意,如果我在 transformed.sort("id", "sampleTime")
上执行show(n),则数据会正确排序 . So it seems like the problem occurs during the write stage. 如上所述,在写入期间,似乎排序的输出似乎是混乱的 .
1 回答
只是想法,在合并后排序:“ . cookles(1).sortWithinPartitions()” . 预期结果看起来也很奇怪 - 为什么要求镶木地板的数据?阅读后的排序看起来更合适 .