我有两张 table :
带列的
-
my_keyspace.name:
-
name(字符串) - 分区键
-
timestamp(date) - 分区键的第二部分
-
id(int) - 分区键的第三部分
带列的
-
my_keyspace.data:
-
timestamp(date) - 分区键
-
id(int) - 分区键的第二部分
-
数据(字符串)
我正在尝试从名称表加入时间戳和id . 我是通过获取与给定名称关联的所有时间戳和ID并从数据表中检索这些条目的数据来实现的 .
在CQL中执行它真的很快 . 我预计Spark Cassandra会同样快速,但它似乎正在进行全表扫描 . 可能是由于不知道哪个字段是分区/主键 . 虽然我似乎无法找到一种方法来告诉它映射 .
如何使此连接尽可能高效?这是我的代码示例:
private static void notSoEfficientJoin() {
SparkConf conf = new SparkConf().setAppName("Simple Application")
.setMaster("local[*]")
.set("spark.cassandra.connection.host", "localhost")
.set("spark.driver.allowMultipleContexts", "true");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD<DataKey, NameRow> nameIndexRDD = javaFunctions(sc).cassandraTable("my_keyspace", "name", mapRowTo(NameRow.class)).where("name = 'John'")
.keyBy(new Function<NameRow, DataKey>() {
@Override
public DataKey call(NameRow v1) throws Exception {
return new DataKey(v1.timestamp, v1.id);
}
});
JavaPairRDD<DataKey, DataRow> dataRDD = javaFunctions(sc).cassandraTable("my_keyspace", "data", mapRowTo(DataRow.class))
.keyBy(new Function<DataRow, DataKey>() {
@Override
public DataKey call(DataRow v1) throws Exception {
return new DataKey(v1.timestamp, v1.id);
}
});
JavaRDD<String> cassandraRowsRDD = nameIndexRDD.join(dataRDD)
.map(new Function<Tuple2<DataKey, Tuple2<NameRow, DataRow>>, String>() {
@Override
public String call(Tuple2<DataKey, Tuple2<NameRow, DataRow>> v1) throws Exception {
NameRow nameRow = v1._2()._1();
DataRow dataRow = v1._2()._2();
return nameRow + " " + dataRow;
}
});
List<String> collect = cassandraRowsRDD.collect();
}
1 回答
更有效地进行连接的方法是实际调用
joinWithCassandraTable
这可以通过使用另一个javaFunctions
调用包装结果来完成:重要的是用
javaFunctions
包装JavaRDD
. 所以有可能不在nameIndexRDD
上调用collect
和sc.parallelize