首页 文章

为什么Apache Spark在客户端上执行过滤器

提问于
浏览
1

作为新手上的apache引发了一些关于在Spark上获取Cassandra数据的问题 .

List<String> dates = Arrays.asList("2015-01-21","2015-01-22");
CassandraJavaRDD<A> aRDD = CassandraJavaUtil.javaFunctions(sc).
                    cassandraTable("testing", "cf_text",CassandraJavaUtil.mapRowTo(A.class, colMap)).
                    where("Id=? and date IN ?","Open",dates);

此查询不过滤cassandra服务器上的数据 . 虽然这个java语句正在执行它的内存并最终抛出spark java.lang.OutOfMemoryError异常 . 查询应该过滤掉cassandra服务器上的数据而不是客户端上的数据,如https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md所述 .

虽然我正在使用cassandra cqlsh上的过滤器执行查询,但它执行正常但执行查询而没有filter(where子句)正在给出预期的超时 . 因此很明显,火花并没有在客户端应用过滤器 .

SparkConf conf = new SparkConf();
            conf.setAppName("Test");
            conf.setMaster("local[8]");
            conf.set("spark.cassandra.connection.host", "192.168.1.15")

为什么在客户端应用过滤器以及如何在服务器端应用过滤器进行改进 .

我们如何在Windows平台上的cassandra集群上配置spark集群?

3 回答

  • 1

    没有使用Cassandra和Spark,从阅读你提供的部分(感谢你)我看到:

    注意:虽然ALLOW FILTERING子句隐式添加到生成的CQL查询中,但Cassandra引擎当前不允许所有谓词 . 这种限制将在未来的Cassandra版本中得到解决 . 目前,ALLOW FILTERING适用于由二级索引或聚类列索引的列 .

    我测试了"IN"谓词不支持:参见https://github.com/datastax/spark-cassandra-connector/blob/24fbe6a10e083ddc3f770d1f52c07dfefeb7f59a/spark-cassandra-connector-java/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java#L80

    因此,您可以尝试将where子句限制为Id(假设存在二级索引)并对日期范围使用spark过滤 .

  • 2

    我建议将表格作为DataFrame而不是RDD来阅读 . 这些可用于Spark 1.3及更高版本 . 然后,您可以将CQL查询指定为如下字符串:

    CassandraSQLContext sqlContext = new CassandraSQLContext(sc);
    
    String query = "SELECT * FROM testing.cf_text where id='Open' and date IN ('2015-01-21','2015-01-22')";
    DataFrame resultsFrame = sqlContext.sql(query);
    
    System.out.println(resultsFrame.count());
    

    所以尝试一下,看看它是否适合你 .

    在DataFrame中获得数据后,就可以对其运行Spark SQL操作 . 如果您想要RDD中的数据,可以将DataFrame转换为RDD .

  • 1

    在SparkConfing中设置spark.cassandra.input.split.size_in_mb解决了这个问题 .

    conf = new SparkConf();
            conf.setAppName("Test");
            conf.setMaster("local[4]");
            conf.set("spark.cassandra.connection.host", "192.168.1.15").
            set("spark.executor.memory", "2g").
            set("spark.cassandra.input.split.size_in_mb", "67108864");
    

    Spark-cassnadra-connector读取spark.cassandra.input.split.size_in_mb的错误值,因此在SparkConf中覆盖此值可以完成工作 . 现在IN子句也很好用 .

相关问题