Filter early and often . 假设未对数据源进行预处理以进行缩减,那么您需要处理的最早且最佳的减少数据量的地方是初始数据查询 . 这通常通过添加where子句来实现 . 请勿携带任何不必要的数据来获取目标结果 . 引入任何额外数据将影响可以通过网络混洗多少数据并写入磁盘 . 不必要地移动数据是一个真正的杀手,应该不惜一切代价避免
At each step you should look for opportunities to filter, distinct, reduce, or aggregate the data as much as possible prior to proceeding to the operation.
2 回答
输入拆分大小由配置
spark.cassandra.input.split.size_in_mb
控制 . 每个拆分都会在Spark中生成一个任务,因此,Cassandra中的数据越多,处理所需的时间就越长(这就是您所期望的)要提高性能,请确保使用
joinWithCassandraTable
对齐分区 . 除非您绝对需要表中的所有数据并使用select
优化检索到的数据以仅投影所需的列,否则请勿使用context.cassandraTable(...)
.如果您需要来自某些行的数据,那么构建一个存储这些行的id的辅助表是有意义的 .
二级索引也可以帮助选择数据的子集,但我已经看到了如果不是高性能的报告 .
继maasgs回答之后,不是在SparkConf上设置
spark.cassandra.input.split.size_in_mb.
,而是在从单个作业中的不同键空间/数据中心读取时使用ReadConf配置是有用的:就性能提升而言,这取决于您正在运行的作业以及所需的转换类型 . 下面概述了最大化Spark-Cassandra性能的一般建议(可以找到here) .
Your choice of operations and the order in which they are applied is critical to performance.
You must organize your processes with task distribution and memory in mind.
首先要确定您的数据是否已正确分区 . 此上下文中的分区仅仅是数据块 . 如果可能,在Spark甚至摄取数据之前对数据进行分区 . 如果这不可行或不可行,您可以选择在加载后立即重新分区数据 . 您可以重新分区以增加分区数或合并以减少分区数 .
作为下限,分区数应至少是将对数据进行操作的核心数量的2倍 . 话虽如此,您还需要确保执行的任何任务至少花费100毫秒来证明网络分发的合理性 . 请注意,重新分区将始终导致混乱,而合并通常不会 . 如果您使用过MapReduce,那么您知道改组是实际工作中大部分时间 .
Filter early and often . 假设未对数据源进行预处理以进行缩减,那么您需要处理的最早且最佳的减少数据量的地方是初始数据查询 . 这通常通过添加where子句来实现 . 请勿携带任何不必要的数据来获取目标结果 . 引入任何额外数据将影响可以通过网络混洗多少数据并写入磁盘 . 不必要地移动数据是一个真正的杀手,应该不惜一切代价避免
At each step you should look for opportunities to filter, distinct, reduce, or aggregate the data as much as possible prior to proceeding to the operation.
尽可能使用管道 . 管道是一系列转换,表示对一个数据的独立操作,并且不需要整体重新组织数据(随机播放) . 例如:字符串中的映射 - >字符串长度是独立的,其中按值排序需要与其他数据元素进行比较以及通过网络重新组织数据(shuffle) .
在需要随机播放的作业中,查看是否可以在随机播放步骤之前使用部分聚合或缩减(类似于MapReduce中的组合器) . 这将减少混洗阶段的数据移动 .
一些昂贵且需要随机播放的常见任务是分类,按键分组,按键减少 . 这些操作要求将数据与昂贵的其他数据元素进行比较 . 了解Spark API非常重要,可以选择最佳的转换组合以及将它们放置在工作中的位置 . 创建回答问题所需的最简单,最有效的算法 .