首页 文章

使用datastax java驱动程序异步写入cassandra的有效方法?

提问于
浏览
2

我使用datastax java驱动程序3.1.0连接到cassandra集群,我的cassandra集群版本是2.0.10 . 我与QUORUM一致性异步编写 .

public void save(final String process, final int clientid, final long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
        }
      }, Executors.newFixedThreadPool(10));
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }

以下是我的 CacheStatement 课程:

public class CacheStatement {
  private static final Map<String, PreparedStatement> cache =
      new ConcurrentHashMap<>();

  private static class Holder {
    private static final CacheStatement INSTANCE = new CacheStatement();
  }

  public static CacheStatement getInstance() {
    return Holder.INSTANCE;
  }

  private CacheStatement() {}

  public BoundStatement getStatement(String cql) {
    Session session = CassUtils.getInstance().getSession();
    PreparedStatement ps = cache.get(cql);
    // no statement cached, create one and cache it now.
    if (ps == null) {
      synchronized (this) {
        ps = cache.get(cql);
        if (ps == null) {
          cache.put(cql, session.prepare(cql));
        }
      }
    }
    return ps.bind();
  }
}

我的上面的 save 方法将从多个线程调用,我认为 BoundStatement 不是线程安全的 . 顺便说一句 StatementCache 类是线程安全的,如上所示 .

  • 由于 BoundStatement 不是线程安全的 . 如果我从多个线程异步写入,我上面的代码会有任何问题吗?

  • 其次,我在 addCallback 参数中使用 Executors.newFixedThreadPool(10) . 这可以,否则会出现问题?或者我应该使用 MoreExecutors.directExecutor . 那么这两者有什么区别呢?对此最好的方法是什么?

下面是我使用datastax java驱动程序连接到cassandra的连接设置:

Builder builder = Cluster.builder();
    cluster =
        builder
            .addContactPoints(servers.toArray(new String[servers.size()]))
            .withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE))
            .withPoolingOptions(poolingOptions)
            .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
            .withLoadBalancingPolicy(
                DCAwareRoundRobinPolicy
                    .builder()
                    .withLocalDc(
                        !TestUtils.isProd() ? "DC2" : TestUtils.getCurrentLocation()
                            .get().name().toLowerCase()).withUsedHostsPerRemoteDc(3).build())
            .withCredentials(username, password).build();

1 回答

  • 1

    我觉得你做的很好 . 您可以通过在应用程序启动时准备所有语句来进一步优化,因此您已经缓存了所有内容,因此在“保存”时不会因准备语句而获得任何性能影响,并且您不会在工作流中锁定任何内容 .

    BoundStatement 不是线程安全的,但是 PreparedStatement 是的,并且每次调用 getStatement 时都会返回一个新的 BoundStatement . 实际上, PreparedStatement.bind() 函数实际上是 new BoundStatement(ps).bind() 的快捷方式 . 并且您不是从多个线程访问相同的 BoundStatement . 所以你的代码很好 .

    相反,对于线程池,您实际上是在每个 addCallback 函数上创建一个新的线程池 . 这是浪费资源 . 我不使用这个回调方法,我更喜欢自己管理普通的 FutureResultSet ,但我在数据共享文档中看到examples使用 MoreExecutors.sameThreadExecutor() 而不是 MoreExecutors.directExecutor() .

相关问题