我使用datastax java驱动程序3.1.0连接到cassandra集群,我的cassandra集群版本是2.0.10 . 我与QUORUM一致性异步编写 .
public void save(final String process, final int clientid, final long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, Executors.newFixedThreadPool(10));
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
以下是我的 CacheStatement
课程:
public class CacheStatement {
private static final Map<String, PreparedStatement> cache =
new ConcurrentHashMap<>();
private static class Holder {
private static final CacheStatement INSTANCE = new CacheStatement();
}
public static CacheStatement getInstance() {
return Holder.INSTANCE;
}
private CacheStatement() {}
public BoundStatement getStatement(String cql) {
Session session = CassUtils.getInstance().getSession();
PreparedStatement ps = cache.get(cql);
// no statement cached, create one and cache it now.
if (ps == null) {
synchronized (this) {
ps = cache.get(cql);
if (ps == null) {
cache.put(cql, session.prepare(cql));
}
}
}
return ps.bind();
}
}
我的上面的 save
方法将从多个线程调用,我认为 BoundStatement
不是线程安全的 . 顺便说一句 StatementCache
类是线程安全的,如上所示 .
-
由于
BoundStatement
不是线程安全的 . 如果我从多个线程异步写入,我上面的代码会有任何问题吗? -
其次,我在
addCallback
参数中使用Executors.newFixedThreadPool(10)
. 这可以,否则会出现问题?或者我应该使用MoreExecutors.directExecutor
. 那么这两者有什么区别呢?对此最好的方法是什么?
下面是我使用datastax java驱动程序连接到cassandra的连接设置:
Builder builder = Cluster.builder();
cluster =
builder
.addContactPoints(servers.toArray(new String[servers.size()]))
.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE))
.withPoolingOptions(poolingOptions)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc(
!TestUtils.isProd() ? "DC2" : TestUtils.getCurrentLocation()
.get().name().toLowerCase()).withUsedHostsPerRemoteDc(3).build())
.withCredentials(username, password).build();
1 回答
我觉得你做的很好 . 您可以通过在应用程序启动时准备所有语句来进一步优化,因此您已经缓存了所有内容,因此在“保存”时不会因准备语句而获得任何性能影响,并且您不会在工作流中锁定任何内容 .
BoundStatement
不是线程安全的,但是PreparedStatement
是的,并且每次调用getStatement
时都会返回一个新的BoundStatement
. 实际上,PreparedStatement
的.bind()
函数实际上是new BoundStatement(ps).bind()
的快捷方式 . 并且您不是从多个线程访问相同的BoundStatement
. 所以你的代码很好 .相反,对于线程池,您实际上是在每个
addCallback
函数上创建一个新的线程池 . 这是浪费资源 . 我不使用这个回调方法,我更喜欢自己管理普通的FutureResultSet
,但我在数据共享文档中看到examples使用MoreExecutors.sameThreadExecutor()
而不是MoreExecutors.directExecutor()
.