首页 文章

如果发生读取超时,如何在Cassandra中重试

提问于
浏览
1

在我的 生产环境 Cassandra Cluster(10个节点)中,我经常得到 ReadTimeoutException . 因此,要在我的本地开发环境(四个节点的Cassandra Cluster)中重现此问题,我运行了我的代码,然后停止了两个CassandraDaemon . 我有以下异常

线程“main”中的异常com.datastax.driver.core.exceptions.UnavailableException:在com.datastax.driver.core.exceptions.UnavailableException.copy中,在一致性ONE(需要1但只有0)的查询中没有足够的副本可用于查询(unavailableException.java:79)com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:269)at com.datastax.driver.core.ArrayBackedResultSet $ MultiPage.prepareNextRow(ArrayBackedResultSet.java:285)at com . 位于com.datastax.driver.core.ArrayBackedResultSet $ 1.hasNext(ArrayBackedResultSet.java:126)的com.cleartrail.keyspacedatamigrator.migrator.Migrator.migrateTimeline上的datastax.driver.core.ArrayBackedResultSet $ MultiPage.isExhausted(ArrayBackedResultSet.java:245) (Migrator.java:376)位于TestMigration.main(TestMigration.java:9)的com.cleartrail.keyspacedatamigrator.migrator.Migrator.migrateData(Migrator.java:267)中 . 引起:com.datastax.driver.core.exceptions . UnavailableException:没有足够的副本av com.datastax.driver.core.exceptions.UnavailableException.copy(UnavailableException.java:79)at com.datastax.driver.core.Responses $ Error.asException(在一致性ONE(需要1但只有0)的查询时可以查询在com.datastax.driver.core.ArrayBackedResultSet $ MultiPage $ 1.onSet(ArrayBackedResultSet.java:352)com.com的com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:183)上的Responses.java:94) . 位于com.datastax.driver.core.RequestHandler的指数$ 2300(RequestHandler.java:45)com.datastax.driver.core.RequestHandler $ SpeculativeExecution的$ SpeculativeExecution.setFinalResult(RequestHandler.java:748) .onSet(RequestHandler.java:587)at com.datastax.driver.core.Connection $ Dispatcher.channelRead0(Connection.java:991)at com.datastax.driver.core.Connection $ Dispatcher.channelRead0(Connection.java:913 )在io.netty.channel.AbstractChannelHan的io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) doContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)位于io.netty的io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)位于io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)的.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)at at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java: 242)在io.netty.channe的io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) l.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)at io.netty.channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)at io.netty.channel.nio.NioEventLoop.processSelectedKeys (NioEventLoop.java:382)io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)at io.netty.util.concurrent.SingleThreadEventExecutor $ 2.run(SingleThreadEventExecutor.java:111)at java.lang .Thread.run(Thread.java:745)引起:com.datastax.driver.core.exceptions.UnavailableException:没有足够的副本可用于com.datastax.driver的一致性ONE(需要1个但只有0个)的查询 . core.Responses $ error $ 1.decode(Responses.java:48)at com.datastax.driver.core.Responses $ Error $ 1.decode(Responses.java:37)at com.datastax.driver.core.Message $ ProtocolDecoder.decode(Message.java:213)atcom.datastax.driver.core.Message $ ProtocolDecoder.decode(Message.java:204)at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)... 13更多

我的Cassandra连接代码如下所示

SocketOptions so = new SocketOptions();
so.setReadTimeoutMillis(Integer.MAX_VALUE);
so.setConnectTimeoutMillis(sockettimeoutinmillis);

Builder builder = new Cluster.Builder().
                addContactPoints(connectionpoints).withPort(port);

builder.withPoolingOptions(new PoolingOptions().setCoreConnectionsPerHost(HostDistance.LOCAL, new PoolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL)));

cluster = builder
               .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
               .withReconnectionPolicy(new ConstantReconnectionPolicy(10000L))
               .build();

session=cluster.connect();

我在连接到Cassandra时提供了重试策略,那为什么我会遇到这样的异常?我没有写任何特定的代码来处理 ReadTimeoutException 并重试 . 是否需要特定的代码或处理?

1 回答

  • 2

    如果您想要自定义逻辑来处理读/写超时和UnavailableExceptions,您可以实现自己的自定义RetryPolicy .

    您可以覆盖 onReadTimeout 方法以完全按照您的意愿运行 .

    但是你提供的例外是一个 UnavailableException ,这是一个cassandra协调员,告诉你没有副本可用来完成你的查询(这意味着拥有你试图读取的数据的所有副本都在C *中标记为DOWN),因此它甚至没有尝试过快速失败 . 在这种情况下,重试可能不会提供太多 Value ,因为您可能会遇到相同的结果 . 基于您指定的 RetryPolicyDowngradingConsistencyRetryPolicy ),可能发生的是遇到ReadTimeout或UnavailableException,并且RetryPolicy在较低的ConsistencyLevel(ONE)处再次尝试但仍然失败,因为遇到另一个UnavailableException .

    为您提出的一些问题可能有助于明确:

    • 您使用的Keyspace使用了哪些复制因子?您在开发环境中提到了4个节点 . 如果1个节点发生故障并且您的RF为1,则可能有25%的查询将遇到UnavailableException .

    • 您查询的一致性级别是什么?如果你不是在受益于你 .

相关问题