使用Apache Kafka Java客户端(0.9),我正在尝试使用Kafka Producer class向代理发送一长串记录 .
异步send method会立即返回一段时间,然后在短时间内开始阻止每次通话 . 大约30秒后,客户端开始抛出异常(TimeoutException),消息为"Batch expired" .
什么情况导致抛出此异常?
使用Apache Kafka Java客户端(0.9),我正在尝试使用Kafka Producer class向代理发送一长串记录 .
异步send method会立即返回一段时间,然后在短时间内开始阻止每次通话 . 大约30秒后,客户端开始抛出异常(TimeoutException),消息为"Batch expired" .
什么情况导致抛出此异常?
6 回答
此异常表示您以比发送速度更快的速率排队记录 .
当您调用send方法时,ProducerRecord将存储在内部缓冲区中以发送给代理 . 一旦ProducerRecord被缓冲,该方法立即返回,无论它是否已被发送 .
记录被分组为批次以发送给代理,以减少每条消息的传输被窃听并增加吞吐量 .
添加批处理记录后,发送该批处理有一个时间限制,以确保它在指定的持续时间内发送 . 这由Producer配置参数request.timeout.ms控制,默认为30秒 .
如果批处理的排队时间超过超时限制,则将引发异常 . 该批次中的记录将从发送队列中删除 .
Increasing the timeout limit, using the configuration parameter, will allow the client to queue batches for longer before expiring.
我在一个完全不同的背景下得到了这个例外 .
我已经设置了一个由zookeeper vm,一个代理vm和一个 生产环境 者/消费者vm组成的迷你集群 . 我打开了服务器(9092)和zookeeper(2181)上的所有必要端口,然后尝试将消费者/发布者vm的消息发布到代理 . 我得到了OP提到的异常,但由于到目前为止我只发布了一条消息(或者至少我试过),解决方案无法增加超时或批量大小 . 所以我搜索并发现这个邮件列表描述了我在尝试使用消费者/ 生产环境 者vm(ClosedChannelException)中的消息时遇到的类似问题:http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config此邮件列表中的最后一篇文章实际上描述了如何解决问题 .
简而言之,如果您同时面对
ChannelClosedException
和Batch Expired
异常,则可能需要在server.config
文件中将此行更改为以下内容并重新启动代理:如果没有设置,它会回退到
host.name
属性(可能也没有设置),然后回退到InetAddress
Java类的规范主机名,这当然不正确,因而混淆了远程节点 .在发送到代理之前控制时间的参数是
linger.ms
. 其默认值为0(无延迟) .我使用的是Kafka Java客户端版本0.11.0.0 . 我也开始看到相同的模式未能始终如一地生成大型消息 . 它传递了一些消息,而其他一些消息则失败了 . (虽然通过和失败的消息大小相同) . 在我的情况下,每个消息大小约为60KB,远远高于Kafka的默认值
batch.size
为16kB,同样我的linger.ms
被设置为默认值为0.此错误正在当 生产环境 者客户端在从服务器收到成功响应之前超时时抛出 . 基本上,在我的代码中,这个调用是超时的:kafkaProd.send(pr).get()
. 为了解决这个问题,我不得不将Producer客户端的默认值request.timeout.ms
增加到60000Kafka在docker-compose中遇到了类似的问题 . 我的docker-compose.yml设置为
但是当我试图从码头外面用骆驼发送消息时
我有一个TimeoutException . 我通过添加来解决它
到Windows \ System32 \ drivers \ etc \ hosts文件,然后将我的骆驼网址更改为
当您将消费者集ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG创建为true时 .