我已经开始在Spark 1.4.0中使用Spark SQL和DataFrames . 我想在Scala中定义DataFrame上的自定义分区程序,但是没有看到如何执行此操作 .
我正在使用的一个数据表包含一个事务列表,按帐户,silimar到下面的示例 .
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
至少在最初,大多数计算将发生在帐户内的交易之间 . 所以我希望对数据进行分区,以便帐户的所有事务都在同一个Spark分区中 .
但我没有看到定义这个的方法 . DataFrame类有一个名为'repartition(Int)'的方法,您可以在其中指定要创建的分区数 . 但我没有看到任何方法可用于为DataFrame定义自定义分区程序,例如可以为RDD指定 .
源数据存储在Parquet中 . 我确实看到在向Parquet编写DataFrame时,您可以指定要分区的列,因此我可以告诉Parquet通过“帐户”列对其数据进行分区 . 但是可能有数百万个帐户,如果我正确理解Parquet,它会为每个帐户创建一个独特的目录,这听起来不是一个合理的解决方案 .
有没有办法让Spark对此DataFrame进行分区,以便帐户的所有数据都在同一个分区中?
5 回答
Spark> = 2.3.0
SPARK-22614公开范围分区 .
SPARK-22389在Data Source API v2中公开外部格式分区 .
Spark> = 1.6.0
在Spark> = 1.6中,可以按列使用分区进行查询和缓存 . 请参阅:SPARK-11410和SPARK-4849使用
repartition
方法:与
RDDs
不同,SparkDataset
(包括Dataset[Row]
a.k.aDataFrame
)目前无法使用自定义分区程序 . 您通常可以通过创建人工分区列来解决这个问题,但它不会给您相同的灵活性 .Spark <1.6.0:
您可以做的一件事是在创建_545206之前预分区输入数据
由于
DataFrame
从RDD
创建只需要一个简单的 Map 阶段,现有的分区布局应该保留*:您可以使用相同的方式重新分配现有的
DataFrame
:所以看起来并非不可能 . 问题仍然存在,如果它有意义的话 . 我认为大多数时候它不会:
重新分区是一个昂贵的过程 . 在典型情况下,大多数数据必须被序列化,洗牌和反序列化 . 另一方面,可以从预分区数据中受益的操作数量相对较小,并且如果内部API不是为了利用该属性而进一步限制 .
在某些情况下加入,但需要内部支持,
窗口函数调用匹配的分区程序 . 与上面相同,仅限于单个窗口定义 . 它已经在内部进行了分区,因此预分区可能是多余的,
使用
GROUP BY
进行简单聚合 - 可以减少临时缓冲区**的内存占用量,但总体成本要高得多 . 或多或少相当于
groupByKey.mapValues(_.reduce)
(当前行为)vsreduceByKey
(预分区) . 不太可能在实践中有用 .使用
SqlContext.cacheTable
进行数据压缩 . 由于它看起来像是使用行程编码,因此应用
OrderedRDDFunctions.repartitionAndSortWithinPartitions
可以提高压缩率 .性能高度依赖于密钥的分布 . 如果它是偏斜的,将导致次优的资源利用率 . 在最糟糕的情况下,根本不可能完成这项工作 .
使用高级声明性API的一个重点是将自己与低级实现细节隔离开来 . 正如@dwysakowicz和@RomiKuntsman已经提到的那样,优化是Catalyst Optimizer的工作 . 这是一个非常复杂的野兽,我真的怀疑你可以很容易地改进它,而不会深入到其内部 .
相关概念
Partitioning with JDBC sources :
JDBC数据源支持predicates argument . 它可以使用如下:
它为每个谓词创建一个JDBC分区 . 请记住,如果使用单个谓词创建的集合不是不相交的,那么您将在结果表中看到重复项 .
partitionBy method in DataFrameWriter :
Spark
DataFrameWriter
提供partitionBy
方法,可用于写入"partition"数据 . 它使用提供的列集分隔写入数据这样可以根据键启用谓词下推读取查询:
但它不等于
DataFrame.repartition
. 特别是聚合:仍然需要
TungstenExchange
:bucketBy method in DataFrameWriter (Spark> = 2.0):
bucketBy
具有与partitionBy
类似的应用程序,但它仅适用于表(saveAsTable
) . Bucketing信息可用于优化连接:*通过分区布局我的意思是只有数据分布 .
partitioned
RDD不再是分区程序 . **假设没有早期预测 . 如果聚合只涵盖小列的子集可能没有任何收益 .在Spark <1.6中如果创建
HiveContext
,而不是普通的旧SqlContext
,则可以使用HiveQLDISTRIBUTE BY colX...
(确保每个N减少器获得x的非重叠范围)和CLUSTER BY colX...
(分配依据和排序依据的快捷方式);不确定这如何适用于Spark DF api . 普通的SqlContext不支持这些关键字(请注意,您不需要使用Hive元文件来使用HiveContext)
编辑:Spark 1.6现在在本机DataFrame API中有这个
所以从某种答案开始:) - 你做不到
我不是专家,但据我了解DataFrames,它们不等于rdd,而DataFrame没有Partitioner这样的东西 .
通常,DataFrame的想法是提供另一个抽象级别来处理这些问题本身 . DataFrame上的查询被转换为逻辑计划,进一步转换为RDD上的操作 . 您建议的分区可能会自动应用,或者至少应该应用 .
如果你不相信SparkSQL会提供某种最佳工作,你可以按照注释中的建议将DataFrame转换为RDD [Row] .
使用返回的DataFrame:
没有明确的方法在DataFrame上使用partitionBy,仅在PairRDD上,但是当您对DataFrame进行排序时,它将在其LogicalPlan中使用它,这将有助于您在每个帐户上进行计算时 .
我只是偶然发现了同样的问题,我希望按帐户划分数据帧 . 我假设当你说“想要对数据进行分区以便一个帐户的所有事务都在同一个Spark分区中”时,你想要它的规模和性能,但你的代码并不依赖它(比如使用mapPartitions()等),对吧?
我能够使用RDD做到这一点 . 但我不知道这是否适合您 . 将DF作为RDD提供后,可以应用repartitionAndSortWithinPartitions来执行数据的自定义重新分区 .
这是我使用的示例: