首页 文章

Cassandra Spark Connector和过滤数据

提问于
浏览
0

我正在使用Spark 1.3.1,我编写了一个小程序来过滤cassandra上的数据

val sc = new SparkContext(conf)
val rdd = sc.cassandraTable("foo", "bar")
val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))
println(rdd2.count())
sc.stop()

这个程序运行很长时间,打印消息

16/09/01 21:10:31 INFO Executor: Running task 46.0 in stage 0.0 (TID 46)
16/09/01 21:10:31 INFO TaskSetManager: Finished task 42.0 in stage 0.0 (TID 42) in 20790 ms on localhost (43/1350)

如果我终止程序并将我的代码更改为

val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))

它仍然会运行很长时间的消息

6/09/01 21:14:01 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
16/09/01 21:14:01 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 19395 ms on localhost (5/1350)

因此,程序似乎总是尝试将整个cassandra表加载到内存中(或尝试完全扫描),然后才应用过滤器 . 这对我来说似乎效率极低 .

如何以更好的方式编写此代码,以便spark不会尝试将整个cassandra表(或完全扫描)加载到RDD中,然后才应用过滤器?

1 回答

  • 1

    你的第一段代码

    val rdd = sc.cassandraTable("foo", "bar")
    val date = DateTime.now().minusDays(30)
    rdd.filter(r => r.getDate("date").after(date.toDate)).count // Count Filtered RDD
    

    所以要小心 . RDD是不可变的,因此当您应用过滤器时,您需要使用返回的RDD而不是您应用该函数的RDD .


    val rdd = sc.cassandraTable("foo", "bar")
    val date = DateTime.now().minusDays(30)
    rdd.filter(r => r.getDate("date").after(date.toDate)) // Filters RDD
    println(rdd.cassandraCount()) // Ignores filtered rdd and counts everything
    

    为了提高Cassandra的阅读效率:

    如果您的日期列是群集键,则可以使用 .where 函数将谓词下推到Cassandra . 除此之外,您无法修剪数据服务器端 .

    https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#filtering-rows---where

相关问题