作为新手上的apache引发了一些关于在Spark上获取Cassandra数据的问题 .
List<String> dates = Arrays.asList("2015-01-21","2015-01-22");
CassandraJavaRDD<A> aRDD = CassandraJavaUtil.javaFunctions(sc).
cassandraTable("testing", "cf_text",CassandraJavaUtil.mapRowTo(A.class, colMap)).
where("Id=? and date IN ?","Open",dates);
此查询不过滤cassandra服务器上的数据 . 虽然这个java语句正在执行它的内存并最终抛出spark java.lang.OutOfMemoryError异常 . 查询应该过滤掉cassandra服务器上的数据而不是客户端上的数据,如https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md所述 .
虽然我正在使用cassandra cqlsh上的过滤器执行查询,但它执行正常但执行查询而没有filter(where子句)正在给出预期的超时 . 因此很明显,火花并没有在客户端应用过滤器 .
SparkConf conf = new SparkConf();
conf.setAppName("Test");
conf.setMaster("local[8]");
conf.set("spark.cassandra.connection.host", "192.168.1.15")
为什么在客户端应用过滤器以及如何在服务器端应用过滤器进行改进 .
我们如何在Windows平台上的cassandra集群上配置spark集群?
3 回答
没有使用Cassandra和Spark,从阅读你提供的部分(感谢你)我看到:
我测试了"IN"谓词不支持:参见https://github.com/datastax/spark-cassandra-connector/blob/24fbe6a10e083ddc3f770d1f52c07dfefeb7f59a/spark-cassandra-connector-java/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java#L80
因此,您可以尝试将where子句限制为Id(假设存在二级索引)并对日期范围使用spark过滤 .
我建议将表格作为DataFrame而不是RDD来阅读 . 这些可用于Spark 1.3及更高版本 . 然后,您可以将CQL查询指定为如下字符串:
所以尝试一下,看看它是否适合你 .
在DataFrame中获得数据后,就可以对其运行Spark SQL操作 . 如果您想要RDD中的数据,可以将DataFrame转换为RDD .
在SparkConfing中设置spark.cassandra.input.split.size_in_mb解决了这个问题 .
Spark-cassnadra-connector读取spark.cassandra.input.split.size_in_mb的错误值,因此在SparkConf中覆盖此值可以完成工作 . 现在IN子句也很好用 .