我需要使用Datastax Java驱动程序批量写入Cassandra,这是我第一次尝试使用批处理与datastax Java驱动程序,所以我有一些困惑 -
下面是我的代码,我在其中尝试创建一个Statement对象并将其添加到Batch并将ConsistencyLevel设置为QUORUM .
Session session = null;
Cluster cluster = null;
// we build cluster and session object here and we use DowngradingConsistencyRetryPolicy as well
// cluster = builder.withSocketOptions(socketOpts).withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
public void insertMetadata(List<AddressMetadata> listAddress) {
// what is the purpose of unloggedBatch here?
Batch batch = QueryBuilder.unloggedBatch();
try {
for (AddressMetadata data : listAddress) {
Statement insert = insertInto("test_table").values(
new String[] { "address", "name", "last_modified_date", "client_id" },
new Object[] { data.getAddress(), data.getName(), data.getLastModifiedDate(), 1 });
// is this the right way to set consistency level for Batch?
insert.setConsistencyLevel(ConsistencyLevel.QUORUM);
batch.add(insert);
}
// now execute the batch
session.execute(batch);
} catch (NoHostAvailableException e) {
// log an exception
} catch (QueryExecutionException e) {
// log an exception
} catch (QueryValidationException e) {
// log an exception
} catch (IllegalStateException e) {
// log an exception
} catch (Exception e) {
// log an exception
}
}
以下是我的 AddressMetadata
课程 -
public class AddressMetadata {
private String name;
private String address;
private Date lastModifiedDate;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public Date getLastModifiedDate() {
return lastModifiedDate;
}
public void setLastModifiedDate(Date lastModifiedDate) {
this.lastModifiedDate = lastModifiedDate;
}
}
现在我的问题是 - 我使用Batch来插入带有Datastax Java Driver的cassandra的方式是否正确?那么重试策略呢,这意味着如果批处理语句执行失败,那么会发生什么,它会再次重试吗?
有没有更好的方法使用Java驱动程序使用批量写入cassandra?
2 回答
首先有点咆哮:
Cassandra中的batch关键字是 not 一种性能优化,用于将大量数据批处理批量加载 .
批处理用于将原子操作组合在一起,这些操作是您希望一起发生的操作 . 批次保证如果批次的单个部分成功,则整个批次成功 .
Using batches will probably not make your mass ingestion run faster
现在提出您的问题:
Cassandra使用称为批量日志记录的机制来确保批处理的原子性 . 通过指定未记录的批处理,您将关闭此功能,因此批处理不再是原子的,可能会因部分完成而失败 . 当然,记录批次并确保其原子性会有性能损失,使用未记录的批次将消除此惩罚 .
在某些情况下,您可能希望使用未记录的批次来确保属于同一分区的请求(插入)一起发送 . 如果将操作一起批处理并且需要在不同的分区/节点中执行,则实质上是为协调器创建了更多工作 . 请参阅Ryan博客中的具体示例:
阅读这篇文章
我在这里看不到你的代码有什么问题,只取决于你想要实现的目标 . 深入了解我分享的博客文章,以获得更多洞察力 .
如果它失败,它自己的批处理将不会自行重试 . 驱动程序确实有重试策略,但您必须单独应用它们 .
java驱动程序中的默认策略仅在以下场景中重试:
在读取超时时,如果回复了足够的副本但未检索到数据 .
在写入超时时,如果我们在编写批处理语句使用的分布式日志时超时 .
阅读有关default policy的更多信息,并根据您的使用案例考虑less conservative policies .
我们在使用异步和批处理之间争论了一段时间 . 我们尝试了两者进行比较 . 与单个"async"请求相比,我们使用"unlogged batches"获得了更好的吞吐量 . 我们不知道为什么,但基于Ryan's blog,我猜它与写入大小有关 . 我们可能正在做太多较小的写入,因此对它们进行批处理可能会给我们带来更好的性能,因为它可以减少网络流量
我必须提到,我们甚至没有以推荐的方式进行“未登记的批次” . 建议的方法是使用单分区键执行批处理 . 基本上,批处理属于同一分区键的所有记录 . 但是,我们只是批处理可能属于不同分区的一些记录 .
有人做了一些基准测试来比较async和"unlogged batches",我们发现它非常有用 . 这是link .