首页 文章

Apache Kafka客户端何时抛出“批量过期”异常?

提问于
浏览
33

使用Apache Kafka Java客户端(0.9),我正在尝试使用Kafka Producer class向代理发送一长串记录 .

异步send method会立即返回一段时间,然后在短时间内开始阻止每次通话 . 大约30秒后,客户端开始抛出异常(TimeoutException),消息为"Batch expired" .

什么情况导致抛出此异常?

6 回答

  • 2

    此异常表示您以比发送速度更快的速率排队记录 .

    当您调用send方法时,ProducerRecord将存储在内部缓冲区中以发送给代理 . 一旦ProducerRecord被缓冲,该方法立即返回,无论它是否已被发送 .

    记录被分组为批次以发送给代理,以减少每条消息的传输被窃听并增加吞吐量 .

    添加批处理记录后,发送该批处理有一个时间限制,以确保它在指定的持续时间内发送 . 这由Producer配置参数request.timeout.ms控制,默认为30秒 .

    如果批处理的排队时间超过超时限制,则将引发异常 . 该批次中的记录将从发送队列中删除 .

    Increasing the timeout limit, using the configuration parameter, will allow the client to queue batches for longer before expiring.

  • 1

    我在一个完全不同的背景下得到了这个例外 .

    我已经设置了一个由zookeeper vm,一个代理vm和一个 生产环境 者/消费者vm组成的迷你集群 . 我打开了服务器(9092)和zookeeper(2181)上的所有必要端口,然后尝试将消费者/发布者vm的消息发布到代理 . 我得到了OP提到的异常,但由于到目前为止我只发布了一条消息(或者至少我试过),解决方案无法增加超时或批量大小 . 所以我搜索并发现这个邮件列表描述了我在尝试使用消费者/ 生产环境 者vm(ClosedChannelException)中的消息时遇到的类似问题:http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config此邮件列表中的最后一篇文章实际上描述了如何解决问题 .

    简而言之,如果您同时面对 ChannelClosedExceptionBatch Expired 异常,则可能需要在 server.config 文件中将此行更改为以下内容并重新启动代理:

    advertised.host.name=<broker public IP address>
    

    如果没有设置,它会回退到 host.name 属性(可能也没有设置),然后回退到 InetAddress Java类的规范主机名,这当然不正确,因而混淆了远程节点 .

  • 31

    在发送到代理之前控制时间的参数是 linger.ms . 其默认值为0(无延迟) .

  • 46

    我使用的是Kafka Java客户端版本0.11.0.0 . 我也开始看到相同的模式未能始终如一地生成大型消息 . 它传递了一些消息,而其他一些消息则失败了 . (虽然通过和失败的消息大小相同) . 在我的情况下,每个消息大小约为60KB,远远高于Kafka的默认值 batch.size 为16kB,同样我的 linger.ms 被设置为默认值为0.此错误正在当 生产环境 者客户端在从服务器收到成功响应之前超时时抛出 . 基本上,在我的代码中,这个调用是超时的: kafkaProd.send(pr).get() . 为了解决这个问题,我不得不将Producer客户端的默认值 request.timeout.ms 增加到60000

  • -4

    Kafka在docker-compose中遇到了类似的问题 . 我的docker-compose.yml设置为

    KAFKA_ADVERTISED_HOST_NAME: kafka
     ports:
            - 9092:9092
    

    但是当我试图从码头外面用骆驼发送消息时

    to("kafka:test?brokers=localhost:9092")
    

    我有一个TimeoutException . 我通过添加来解决它

    127.0.0.1 kafka
    

    到Windows \ System32 \ drivers \ etc \ hosts文件,然后将我的骆驼网址更改为

    to("kafka:test?brokers=kafka:9092")
    
  • 3

    当您将消费者集ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG创建为true时 .

相关问题