我的数据原则上是一个表,除了其他'data'之外,它还包含一列 ID
和一列 GROUP_ID
.
在第一步中,我将CSV读入Spark,进行一些处理以准备第二步的数据,并将数据写为镶木地板 . 第二步做了很多 groupBy('GROUP_ID')
和 Window.partitionBy('GROUP_ID').orderBy('ID')
.
现在的目标是 - 为了避免第二步中的混乱 - 在第一步中有效地加载数据,因为这是一个单一计时器 .
Question Part 1: AFAIK,Spark从镶木地板加载时保留了分区(这实际上是任何"optimized write consideration"的基础) - 对吗?
我提出了三种可能性:
-
df.orderBy('ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet')
-
df.orderBy('ID').repartition(n, 'TRIP_ID').write.parquet('/path/to/parquet')
-
df.repartition(n, 'TRIP_ID').sortWithinPartitions('ID').write.parquet('/path/to/parquet')
我会设置 n
这样单个拼花文件大约是100MB .
Question Part 2: 这三个选项在目标方面是否产生"the same" /类似结果(第二步避免改组)是否正确?如果没有,有什么区别?哪一个是'better'?
Question Part 3: 三个选项中哪一个在第1步中表现更好?
感谢您分享您的知识!
EDIT 2017-07-24
在做了一些测试(写入和读取镶木地板)之后,似乎Spark在第二步中默认无法恢复 partitionBy
和 orderBy
信息 . 分区数(从 df.rdd.getNumPartitions()
获得似乎由核心数和/或 spark.default.parallelism
(如果设置)确定,但不是由镶木地板分区数决定 . 所以 answer for question 1 将是 WRONG ,问题2和3将是无关 .
所以事实证明 REAL QUESTION 是:有没有办法告诉Spark,数据已经被列 X 分区并按列 Y 排序?
2 回答
据我所知,没有办法从镶木地板中读取数据,并告诉Spark它已经被某个表达式分区并被命令 .
简而言之,HDFS等上的一个文件对于一个Spark分区来说太大了 . 即使您将整个文件读取到使用Parquet属性(例如
parquet.split.files=false
,parquet.task.side.metadata=true
等)的一个分区,也只会有一个shuffle与大多数成本相比 .试试bucketBy . 此外,分区发现可以提供帮助 .