首页 文章

Spring Boot KafkaListener在运行一段时间后停止使用消息

提问于
浏览
1

我有一个Spring Boot项目,在Confluent Kakfa主题上运行几个Kafka使用者(@KafkaListener),有8个分区 . 每个使用者的并发性设置为1.主题从文件中加载大约一百万行消息,并且消费者批量使用它们来验证,处理和更新数据库 .

消费者工厂具有以下设置 - max.poll.records = 10000,fetch.min.bytes = 100000,fetch.max.wait.ms = 1000,session.timeout.ms = 240000 .

Update 06/04 这是消费者工厂设置 . 它是Spring-Kafka-1.3.1.RELEASE . Confluent Kafka经纪人是版本

@Bean
public ConsumerFactory<String, ListingMessage> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
    props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100000);
    props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 240000);

    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
        new JsonDeserializer<>(ListingMessage.class));
}

@Bean(KAFKA_LISTENER_CONTAINER_FACTORY) @Autowired
public concurrentKafkaListenerContainerFactory<String, ListingMessage> listingKafkaListenerContainerFactory(
    ConsumerFactory<String, ListingMessage> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, ListingMessage> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(listingConsumerFactory);
    factory.setConcurrency(1);
    factory.setAutoStartup(false);
    factory.setBatchListener(true);
    return factory;
}

Note: 容器工厂已将自动启动设置为false . 这是在加载大文件时手动启动/停止使用者 .

在运行大约1小时(时间变化)之后,即使主题有许多消息可用,消费者也会停止使用其主题中的消息 . consume方法中有一个日志语句,用于停止在日志中打印 .

我使用“./kafka-consumer-groups”命令跟踪消费者的状态,并在一段时间后看到该组中没有消费者 .

$ ./kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group  group_name

此消费者失败的日志中没有错误 . 使用者方法包含在try-catch块中,因此它将捕获在处理消息期间抛出的任何异常 .

我们如何设计Spring-Kafka消费者,以便在消费者停止消费时重新启动消费者?当消费者停止时,是否有可以记录确切点的监听器?这是因为将并发性设置为1?我必须将并发性设置为1的原因是,如果此消费者具有更多并发性,那么其他消费者会放慢速度 .

1 回答

  • 2

    我刚用了30秒 max.poll.interval.ms=30000 进行了测试,暂停了听众,30秒后恢复了;我在日志中看到了这一点......

    2018-06-04 18:35:59.361  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so50687794-0]
    foo
    
    2018-06-04 18:37:07.347 ERROR 4191 --- [      foo-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null
    
    org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722) ~[kafka-clients-1.0.1.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) ~[kafka-clients-1.0.1.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250) ~[kafka-clients-1.0.1.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1329) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1190) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
    
    2018-06-04 18:37:07.350  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Revoking previously assigned partitions [so50687794-0]
    2018-06-04 18:37:07.351  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [so50687794-0]
    2018-06-04 18:37:07.351  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=foo] (Re-)joining group
    2018-06-04 18:37:10.400  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Successfully joined group with generation 15
    2018-06-04 18:37:10.401  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Setting newly assigned partitions [so50687794-0]
    2018-06-04 18:37:10.445  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so50687794-0]
    foo
    

    您可以看到重新 balancer 后,重新添加消费者并重新传递相同的消息;这是我所期待的 .

    我得到了相同的结果;即使是1.3.1 .

相关问题