首页 文章

如果Kafka Broker失败,请处理失败

提问于
浏览
1

我有一个Kafka代理运行消息成功使用但我想处理Kafka Broker在Kafka Consumer端失败的情况 .

我已阅读this线程,但后来知道日志显示在DEBUG级别 . 我想知道我是否可以在事件触发器上手动处理这个问题可能是因为我想自己处理Kafka经纪人的失败 . Spring Kafka是否提供了处理这种情况的方法?

如果需要更多细节,请告诉我 . 我非常感谢任何有关这方面的建议,这些建议将指引我朝着正确的方向前进 . 谢谢

EDIT 1:

正如@Artem所回答的,我在我的KafkaConsumer中尝试过这个

@EventListener
public void handleEvent(NonResponsiveConsumerEvent event) {
    LOGGER.info("*****************************************");
    LOGGER.info("Hello NonResponsiveConsumer {}", event);
    LOGGER.info("*****************************************");     
}

即使Kafka服务器正在运行(当我第一次启动应用程序时),也会触发此事件 . 请看以下日志:

....
....
2017-12-04 13:08:02,177 INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 0
2017-12-04 13:08:02,218 INFO o.a.k.c.c.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [52.214.67.60:9091]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = workerListener
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer

2017-12-04 13:08:02,346 INFO o.a.k.c.u.AppInfoParser - Kafka version : 0.11.0.0
2017-12-04 13:08:02,346 INFO o.a.k.c.u.AppInfoParser - Kafka commitId : cb8625948210849f
2017-12-04 13:08:02,350 INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService 
2017-12-04 13:08:02,363 INFO o.s.b.a.e.j.EndpointMBeanExporter - Located managed bean 'auditEventsEndpoint': registering with JMX server as MBean [org.springframework.boot:type=Endpoint,name=auditEventsEndpoint]
2017-12-04 13:08:02,397 INFO c.t.m.w.c.k.c.KafkaConsumer - *****************************************
2017-12-04 13:08:02,397 INFO c.t.m.w.c.k.c.KafkaConsumer - Hello NonResponsiveConsumer ListenerContainerIdleEvent [timeSinceLastPoll=1.51237491E9s, listenerId=workerListener-0, container=KafkaMessageListenerContainer [id=workerListener-0, clientIndex=-0, topicPartitions=null], topicPartitions=null]
2017-12-04 13:08:02,403 INFO c.t.m.w.c.k.c.KafkaConsumer - *****************************************
....
....

EDIT 2:

通过将 spring-kafka 升级到 1.3.2 解决了问题

1 回答

  • 2

    从版本 1.3.1 开始,有一个:

    /**
     * An event that is emitted when a consumer is not responding to
     * the poll; a possible indication that the broker is down.
     *
     * @author Gary Russell
     * @since 1.3.1
     *
     */
    @SuppressWarnings("serial")
    public class NonResponsiveConsumerEvent extends KafkaEvent {
    

    引用文档:

    此外,如果代理无法访问(在编写本文时),则consumer poll()方法不会退出,因此不会收到任何消息,也无法生成空闲事件 . 要解决此问题,如果轮询未在pollInterval属性的3倍内返回,则容器将发布NonResponsiveConsumerEvent . 默认情况下,每个容器中每30秒执行一次此检查 . 您可以通过在配置侦听器容器时在ContainerProperties中设置monitorInterval和noPollThreshold属性来修改行为 . 接收此类事件将允许您停止容器,从而唤醒消费者,以便它可以终止 .

相关问题