首页 文章

无法在Spark中将有序数据写入镶木地板

提问于
浏览
1

我正在使用Apache Spark来生成镶木地板文件 . 我可以按日期对它们进行分区而没有任何问题,但在内部我似乎无法以正确的顺序布置数据 .

订单似乎在处理期间丢失,这意味着镶木地板元数据不正确(具体来说,我希望确保镶木地板行组反映排序顺序,以便特定于我的用例的查询可以通过元数据有效过滤) .

请考虑以下示例:

// note: hbase source is a registered temp table generated from hbase
val transformed = sqlContext.sql(s"SELECT  id, sampleTime, ... , 
toDate(sampleTime) as date FROM hbaseSource")

// Repartion the input set by the date column ( in my source there should be 
2 distinct dates)
val sorted = transformed.repartition($"date").sortWithinPartitions("id", 
"sampleTime")

sorted.coalesce(1).write.partitionBy("date").parquet(s"/outputFiles")

通过这种方法,我得到了正确的镶木地板分区结构(按日期) . 甚至更好,对于每个日期分区,我看到一个大的镶木地板文件 .

/outputFiles/date=2018-01-01/part-00000-4f14286c-6e2c-464a-bd96-612178868263.snappy.parquet

但是,当我查询文件时,我看到的内容不按顺序 . 具体而言,“乱序”似乎更像是几个有序的数据帧分区已合并到文件中 .

镶木地板行组元数据显示已排序的字段实际上是重叠的(例如,特定的id可能位于许多行组中):

id:             :[min: 54, max: 65012, num_nulls: 0]
sampleTime:     :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id:             :[min: 827, max: 65470, num_nulls: 0]
sampleTime:     :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id:             :[min: 1629, max: 61412, num_nulls: 0]

我希望在每个文件中正确排序数据,因此每个行组中的元数据最小值/最大值不重叠 .

例如,这是我想看到的模式:

RG 0: id:             :[min: 54, max: 100, num_nulls: 0]
RG 1: id:             :[min: 100, max: 200, num_nulls: 0]

...其中RG =“行组” . 如果我想要id = 75,查询可以在一个行组中找到它 .

我已尝试过上述代码的许多变体 . 例如,有和没有 coalesce (我知道合并是坏的,但我的想法是用它来防止洗牌) . 我也尝试过 sort 而不是 sortWithinPartitions (sort应该创建一个总排序,但会产生很多分区) . 例如:

val sorted = transformed.repartition($"date").sort("id", "sampleTime") 
sorted.write.partitionBy("date").parquet(s"/outputFiles")

给了我200个文件,这个文件太多了,但它们仍然没有正确排序 . 我可以通过调整shuffle大小来减少文件数量,但我希望在写入期间按顺序处理排序(我的印象是写入没有改变输入) . 我看到的顺序如下(为简洁起见省略了其他字段):

+----------+----------------+
|id|      sampleTime|
+----------+----------------+
|     56868|1514840220000000|
|     57834|1514785180000000|
|     56868|1514840220000000|
|     57834|1514785180000000|
|     56868|1514840220000000|

看起来像是交错排序的分区 . 所以我认为 repartition 在这里买不到任何东西,并且 sort 似乎无法保持写入步骤的顺序 .

我甚至尝试过Ryan Blue在演示文稿"Parquet performance tuning: The missing guide"中概述的方法(不幸的是它落后于OReily付费专区) . 这涉及到使用 insertInto . 在那种情况下,spark似乎使用了旧版本的parquet-mr,它破坏了元数据,我不知道如何升级它 .

我不确定我做错了什么 . 我的感觉是我误解了 repartition($"date")sort 工作和/或互动的方式 .

我很感激任何想法 . 为论文道歉 . :)

编辑:另请注意,如果我在 transformed.sort("id", "sampleTime") 上执行show(n),则数据会正确排序 . So it seems like the problem occurs during the write stage. 如上所述,在写入期间,似乎排序的输出似乎是混乱的 .

1 回答

  • 0

    只是想法,在合并后排序:“ . cookles(1).sortWithinPartitions()” . 预期结果看起来也很奇怪 - 为什么要求镶木地板的数据?阅读后的排序看起来更合适 .

相关问题