我已经在本地机器上设置了Spark 2.0和Cassandra 3.0(8核,16gb ram)用于测试目的,并按如下方式编辑 spark-defaults.conf
:
spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4
接下来我在Cassandra中导入了150万行:
test(
tid int,
cid int,
pid int,
ev list<double>,
primary key (tid)
)
test.ev
是包含数值的列表,即 [2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]
现在在代码中,为了测试整个事情我刚刚创建了 SparkSession
,连接到Cassandra并进行简单的选择计数:
cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()
此时,Spark输出 count
并需要大约28秒才能完成 Job
,分布在13 Tasks
(在 Spark UI
中,任务的总输入为331.6MB)
Questions:
-
这是预期的表现吗?如果没有,我错过了什么?
-
Theory说DataFrame的分区数决定了Spark将分配作业的任务数 . 如果我将
spark.sql.shuffle.partitions
设置为4,为什么要创建13个任务? (还确保在我的DataFrame上调用rdd.getNumPartitions()
的分区数)
Update
我想测试这个数据的常见操作:
-
查询大型数据集,例如,按
pid
分组的100,000~N行 -
选择
ev
,alist<double>
-
对每个成员执行平均值,假设现在每个列表具有相同的长度,即
df.groupBy('pid').agg(avg(df['ev'][1]))
正如@ zero323建议的那样,我为Cassandra部署了一台外部机器(2Gb RAM,4核,SSD),仅用于此测试,并加载了相同的数据集 . 与我之前的测试相比,df.select() . count()的结果是预期更大的延迟和整体性能更差(完成Job约需70秒) .
Edit: 我误解了他的建议 . @ zero323意味着让Cassandra执行计数而不是使用Spark SQL,如_2543939中所述
另外我想指出的是,我知道为这种类型的数据设置 list<double>
而不是宽行的固有反模式,但此时我关注的是更多花在检索大型数据集上的时间而不是实际平均计算时间 .
1 回答
它看起来很慢,但并不完全出乎意料 . 一般来说
count
表示为接着是Spark方面的总结 . 因此,虽然它被优化,但它仍然相当低效,因为你从外部源获取N个长整数只是为了在本地求和 .
正如_2543943所述_Cassandra支持的RDD(不是
Datasets
)提供了优化的cassandraCount
方法,该方法执行服务器端计数 .因为这里没有使用
spark.sql.shuffle.partitions
. 此属性用于确定shuffle的分区数(当数据由某些键集合聚合时),而不是用于Dataset
创建或全局聚合,如count(*)
(总是使用1个分区进行最终聚合) .如果您对控制初始分区的数量感兴趣,请查看spark.cassandra.input.split.size_in_mb,它定义:
正如你可以看到的另一个因素是
spark.default.parallelism
但它并不是一个微妙的配置,所以一般来说取决于它不是一个最佳选择 .