我使用的是DSE 5.0.0 . 我在单个节点Cassandra集群上创建了下表:

CREATE TABLE IF NOT EXISTS dummy (
  id uuid,
  txt text,
  PRIMARY KEY (id)
);

INSERT INTO dummy(id, txt) values (uuid(), 'hello world');

然后当我使用Spark cassandra连接器查询特定的id时,我没有得到任何结果:

val df = sqlc.read.format("org.apache.spark.sql.cassandra")
         .options(Map("table" -> "mytable", "keyspace" -> "myks"))
         .load()

df.show(false)

// +------------------------------------+-----------+
// |id                                  |txt        |
// +------------------------------------+-----------+
// |2b69ddc1-2c15-485d-a30f-1b2d7f86c200|hello world|
// +------------------------------------+-----------+

df.filter("id = '2b69ddc1-2c15-485d-a30f-1b2d7f86c200'").show

// 16/07/28 08:51:43 DEBUG CassandraTableScanRDD: Fetching data for range (token("id") <= ?,List(-9223372036854775808)) with SELECT "id", "txt" FROM "myks"."mytable" WHERE token("id") <= ? AND "id" = ?   ALLOW FILTERING with params [-9223372036854775808,2b69ddc1-2c15-485d-a30f-1b2d7f86c200]
// +---+---+
// | id|txt|
// +---+---+
// +---+---+

看起来连接器生成的查询会生成以下错误谓词:

WHERE token("id") <= Long.MinValue

通过设置几个断点,我发现由cassandra驱动程序构建的元数据故意将TokenRange设置为] minToken,minToken]:

// com.datastax.driver.core.Metadata, line 671
    private static Set<TokenRange> makeTokenRanges(List<Token> ring, Token.Factory factory) {
        ImmutableSet.Builder<TokenRange> builder = ImmutableSet.builder();
        // JAVA-684: if there is only one token, return the range ]minToken, minToken]
        if (ring.size() == 1) {
            builder.add(new TokenRange(factory.minToken(), factory.minToken(), factory));

如果我修改上面的驱动程序代码以返回] minToken,ring(0)],我的数据帧将返回预期的结果 . 它可能是Cassandra驱动程序和/或连接器中的错误,还是我做错了什么?