首页 文章

SparkSQL下推过滤不适用于Spark Cassandra Connector

提问于
浏览
2

我有一个表模式

appname text,
randomnum int,
addedtime timestamp,
shortuuid text,
assetname text,
brandname text,

PRIMARY KEY ((appname, randomnum), addedtime, shortuuid)

addedtime是集群密钥

现在,当我在集群密钥添加时使用下推过滤器时,我看不到它被应用了

val rdd = tabledf.filter("addedtime > '" + _to + "'").explain
== Physical Plan ==
Filter (cast(addedtime#2 as string) > 2016-12-20 11:00:00)

根据文档,它应该被应用https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#pushdown-filter-examples

它也在spark cassandra连接器1.4中工作,但没有使用最新的一个cassandra连接器1.6.0-M1 . 请让我知道这个问题

2 回答

  • 5

    问题分析

    问题似乎是Catalyst处理比较的方式 .

    做的时候

    val rdd = tabledf.filter("addedtime > '" + _to + "'").explain
    

    它将addedTime列转换为String,然后进行比较 . Catalyst没有将此谓词提供给Spark Cassandra Connector,因此无法推送它 .

    INFO  2016-03-08 17:10:49,011 org.apache.spark.sql.cassandra.CassandraSourceRelation: Input Predicates: []
    Filter (cast(addedtime#2 as string) > 2015-08-03)
    

    这也是错误的,因为它正在进行字符串比较(这在词汇上会有效,但实际上并不是你想要做的)所以这看起来像是Catalyst中的一个错误,因为我们应该将谓词提供给源代码,即使有一个“投” . 有一种解决方法,但它涉及为Catalyst优化器提供它想要查看的内容 .

    解决方法

    相反,我们给出一个类型提示

    df.filter("addedtime > cast('2015-08-03' as timestamp)").explain
    

    然后Spark将生成正确的比较,而不使用字符串Cast

    DEBUG 2016-03-08 17:11:09,792 org.apache.spark.sql.cassandra.CassandraSourceRelation: Basic Rules Applied:
    C* Filters: [GreaterThan(addedtime,2015-08-03 00:00:00.0)]
    Spark Filters []
    
    == Physical Plan ==
    Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@332464fe[appname#0,randomnum#1,addedtime#2,shortuuid#3] PushedFilters: [GreaterThan(addedtime,2015-08-03 00:00:00.0)]
    
  • 0

    您还可以使用java.sql.Timestamp

    val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
    val date = LocalDateTime.parse("2015-08-03", dateFormatter)
    val timestamp= Timestamp.from(date.atZone(ZoneId.systemDefault()).toInstant)
    
    df.filter($"addedtime" > timestamp).explain
    

相关问题