首页 文章

到期xxxxx的1条记录:自批量创建加上延迟时间以来已经过了30030毫秒

提问于
浏览
5

My use case: 使用Postman,我称之为Spring boot soap endpoints . endpoints 创建KafkaProducer并向特定主题发送消息 . 我还有一个TaskScheduler来使用该主题 .

The problem: 当调用soap将消息推送到主题时,我收到此错误:

2017-11-14 21:29:31.463 ERROR 6389 --- [ad | producer-3] DomainEntityProducer:对于DomainEntityCommandStream-0过期1条记录:自创建批次以来已经过了30030毫秒加上逗留时间2017-11-14 21:29:31.464 ERROR 6389 --- [nio-8080-exec-6 ] DomainEntityProducer:org.apache.kafka.common.errors.TimeoutException:对于DomainEntityCommandStream-0过期1条记录:自批量创建加上延迟时间以来已经过了30030毫秒

这是我用来推动这个主题的方法:

public DomainEntity push(DomainEntity pDomainEntity) throws Exception {
    logger.log(Level.INFO, "streaming...");
    wKafkaProperties.put("bootstrap.servers", "localhost:9092");
    wKafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    wKafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer wKafkaProducer = new KafkaProducer(wKafkaProperties);
    ProducerRecord wProducerRecord = new ProducerRecord("DomainEntityCommandStream", getJSON(pDomainEntity));
    wKafkaProducer.send(wProducerRecord, (RecordMetadata r, Exception e) -> {
        if (e != null) {
            logger.log(Level.SEVERE, e.getMessage());
        }
    }).get();
    return pDomainEntity;
}

使用命令shell脚本

./kafka-console-producer.sh --broker-list 10.0.1.15:9092 --topic DomainEntityCommandStream

./kafka-console-consumer.sh --boostrap-server 10.0.1.15:9092 --topic DomainEntityCommandStream - from-beginning

效果很好 .

通过Stackoverflow上的一些相关问题,我试图清除主题:

./kafka-topics.sh --zookeeper 10.0.1.15:9092 --alter --topic DomainEntityCommandStream --config retention.ms = 1000

看看kafka日志,我发现保留时间已经改变了 .

但是,没有运气,我得到同样的错误 .

有效负载非常小,为什么我应该更改batch.size?

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
                  xmlns:gs="http://soap.problem.com">
   <soapenv:Header/>
   <soapenv:Body>
      <gs:streamDomainEntityRequest>
         <gs:domainEntity>
                <gs:name>12345</gs:name>
                <gs:value>Quebec</gs:value>
                <gs:version>666</gs:version>
            </gs:domainEntity>
      </gs:streamDomainEntityRequest>
   </soapenv:Body>
</soapenv:Envelope>

1 回答

  • 0

    使用Docker和Kafka 0.11.0.1映像,您需要将以下环境参数添加到容器中:

    KAFKA_ZOOKEEPER_CONNECT = X.X.X.X:XXXX(您的zookeeper IP或域名:PORT默认2181)KAFKA_ADVERTISED_HOST_NAME = X.X.X.X(您的kafka IP或域名)KAFKA_ADVERTISED_PORT = XXXX(您的kafka PORT号码默认为9092)

    可选:

    KAFKA_BROKER_ID = 999(某个值)KAFKA_CREATE_TOPICS = test:1:1(在开始时创建的某个主题名称)

    如果它不起作用并且您仍然得到相同的消息(“自xxxxx过期X记录:XXXXX ms自批量创建加上延迟时间后已经过去”),您可以尝试从zookeeper清除kafka数据 .

相关问题