从Cassandra表加载数据时,spark分区表示具有相同分区键的所有行 . 但是,当我使用相同的分区键在spark中创建数据并使用.repartitionByCassandraReplica(..)方法重新分区新的RDD时,它最终会出现在不同的spark分区中?如何使用Spark-Cassandra连接器定义的分区方案在spark中实现一致的分区?
链接下载我测试的CQL和Spark作业代码
版本和其他信息
-
Spark:1.3
-
卡珊德拉:2.1
-
连接器:1.3.1
-
Spark节点(5)和Cass *集群节点(4)在不同的数据中心运行
代码提取 . 使用以上链接下载代码以获取更多详细信
Step 1 : Loads data into 8 spark partitions
Map<String, String> map = new HashMap<String, String>();
CassandraTableScanJavaRDD<TestTable> tableRdd = javaFunctions(conf)
.cassandraTable("testkeyspace", "testtable", mapRowTo(TestTable.class, map));
Step 2 : Repartition data into 8 partitions
.repartitionByCassandraReplica(
"testkeyspace",
"testtable",
partitionNumPerHost,
someColumns("id"),
mapToRow(TestTable.class, map));
Step 3: Print partition id and values for both rdds
rdd.mapPartitionsWithIndex(...{
@Override
public Iterator<String> call(..) throws Exception {
List<String> list = new ArrayList<String>();
list.add("PartitionId-" + integer);
while (itr.hasNext()) {
TestTable value = itr.next();
list.add(Integer.toString(value.getId()));
}
return list.iterator();
}
}, true).collect();
Step 4 : Snapshot of results printed on Partition 1. Different for both Rdds but expect to be same
加载Rdd值
----------------------------
Table load - PartitionId -1
----------------------------
15
22
--------------------------------------
Repartitioned values - PartitionId -1
--------------------------------------
33
16
1 回答
Cassandra副本的重新分区不会确定性地放置密钥 . 目前有一张票可以改变 .
https://datastax-oss.atlassian.net/projects/SPARKC/issues/SPARKC-278
现在解决方法是将Partitionspernode参数设置为1 .