我正在尝试使用Cassandra计数器和Datastax Java驱动程序进行一些实时分析 . 使用嵌入式Cassandra(版本2.1.9,由cassandra-unit提供)的单元测试成功通过但是当我将其部署到真实集群(版本2.2.3)时,我无法选择插入的内容 .

我的 Cassandra 表:

CREATE TABLE analytics.aggregate (
    tick bigint,
    tick_type text,
    aggregate_type text,
    dimensions frozen<map<text, text>>,
    value counter,
    PRIMARY KEY ((tick, tick_type, aggregate_type, dimensions))
);

我正在使用带有驱动程序映射包版本2.1.7.1的Datastax Java驱动程序 . 我定义了一个访问者界面:

@Accessor
public interface CassandraWriter {

    @Query(
            "UPDATE " +
                "analytics.aggregate " +
            "SET " +
                "value = value + :value " +
            "WHERE " +
                "tick = :tick AND " +
                "tick_type = :tick_type AND " +
                "aggregate_type = :aggregate_type AND " +
                "dimensions = :dimensions"
    )
    @QueryParameters(consistency = "QUORUM")
    Statement increment(
            @Param("tick") long tick,
            @Param("tick_type") String tickType,
            @Param("aggregate_type") String aggregateType,
            @Param("dimensions") Map<String, String> dimensions,
            @Param("value") long value
    );
}

并使用此代码进行插入:

BatchStatement batch = new BatchStatement(BatchStatement.Type.COUNTER);

for (MyPojo pojo : pojos) {
    batch.add(
        cassandraWriter.increment(
            pojo.getTick(),
            pojo.getTickType(),
            pojo.getAggregateType(),
            pojo.getDimensions(),
            pojo.getValue()
        )
    );
}

session.execute(batch);

(是的,我知道使用同步执行的批处理语句不是最佳实践,将其视为PoC)

最后,我无法查询插入的内容 . 在cqlsh中,使用通配符选择语句时:

SELECT * FROM analytics.aggregate LIMIT 10;

我得到了结果 . 但是,当我尝试选择where子句时,e . G . :

SELECT value 
FROM analytics.aggregate 
WHERE 
    tick=1455551820000 AND
    tick_type='MINUTE' AND
    aggregate_type='REQUEST_COUNT' AND 
    dimensions={'customer_id': '1', 'city_id': '2'};

我得到一个空的结果集 .

当我通过cqlsh手动插入记录时,上面的查询工作正常 . 所以在插入方面似乎存在问题 .

这是与此相关的事情:

Can't retrieve by UDT key from cassandra with datastax java driver

https://datastax-oss.atlassian.net/browse/JAVA-831

更新:回复辣椒粉的评论 . 是的我还试过个别陈述而不是批次,结果是一样的,问题仍然存在 .

我用于插入的代码:

List<ResultSetFuture> futures = new ArrayList<>(pojos.size());
for (MyPojo pojo : pojos) {
    futures.add(
        session.executeAsync(
            cassandraWriter.increment(
                pojo.getTick(),
                pojo.getTickType(),
                pojo.getAggregateType(),
                pojo.getDimensions(),
                pojo.getValue()
            )
        )
    );
}

for (ResultSetFuture future : futures) {
    future.getUninterruptibly();
}

MyPojo真的只是一个pojo,没有什么特别之处:

public class MyPojo {

    private long tick;
    private String tickType;
    private String aggregateType;
    private Map<String, String> dimensions;
    private long value;

    public long getTick() {
        return tick;
    }

    public void setTick(long tick) {
        this.tick = tick;
    }

    public String getTickType() {
        return tickType;
    }

    public void setTickType(String tickType) {
        this.tickType = tickType;
    }

    public String getAggregateType() {
        return aggregateType;
    }

    public void setAggregateType(String aggregateType) {
        this.aggregateType = aggregateType;
    }

    public Map<String, String> getDimensions() {
        return dimensions;
    }

    public void setDimensions(Map<String, String> dimensions) {
        this.dimensions = dimensions;
    }

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}

与此同时,我也尝试将Datastax驱动程序从v.2.1.7.1升级到3.0.0,但这没有用 .