首页 文章

Kafka消息被重新处理

提问于
浏览
0

我们有一个微服务,使用spring-boot和spring-cloud-stream生成和使用来自Kafka的消息 .
版本:
spring-boot:1.5.8.RELEASE
spring-cloud-stream:Ditmars.RELEASE
Kafka服务器:kafka_2.11-1.0.0

EDIT: 我们正在使用包含3个Kafka节点的StatefulSets群集和3个Zookeeper节点的群集在Kubernetes环境中工作 .

我们经历了几次旧消息的发生,这些消息在几天前已经处理过的消息中被重新处理 .
几点说明:

  • 在此之前,打印了以下日志(有更多类似的行,这只是一个摘要)

撤销以前为群组注册服务分配的分区[]已发现的协调员dev-kafka-1.kube1.iaas.watercorp.com:9092(id:2147483646 rack:null)已成功加入群组注册服务与第320代

  • 上面提到的撤销和重新分配分区的事件每隔几个小时就会发生一次 . 而在这些事件中,很少有旧消息被重新消耗 . 在大多数情况下,重新分配不会触发消息消耗 .

  • 消息来自不同的分区 .

  • 每个 partition 正在重新处理的消息超过1条 .

application.yml:

spring:
  cloud:
      stream:
        kafka:
          binder:
            brokers: kafka
            defaultBrokerPort: 9092
            zkNodes: zookeeper
            defaultZkPort: 2181
            minPartitionCount: 2
            replicationFactor: 1
            autoCreateTopics: true
            autoAddPartitions: true
            headers: type,message_id
            requiredAcks: 1
            configuration:
              "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
          bindings:
            user-enrollment-input:
              consumer:
                autoRebalanceEnabled: true
                autoCommitOnError: true
                enableDlq: true
            user-input:
              consumer:
                autoRebalanceEnabled: true
                autoCommitOnError: true
                enableDlq: true
            enrollment-mail-output:
              producer:
                sync: true
                configuration:
                  retries: 10000
            enroll-users-output:
              producer:
                sync: true
                configuration:
                  retries: 10000
        default:
          binder: kafka
          contentType: application/json
          group: enrollment-service
          consumer:
            maxAttempts: 1
          producer:
            partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor
        bindings:
          user-enrollment-input:
            destination: enroll-users
            consumer:
              concurrency: 10
              partitioned: true
          user-input:
            destination: user
            consumer:
              concurrency: 5
              partitioned: true
          enrollment-mail-output:
            destination: send-enrollment-mail
            producer:
              partitionCount: 10
          enroll-users-output:
            destination: enroll-users
            producer:
              partitionCount: 10

有没有我可能遗失的配置?什么可以导致这种行为?

1 回答

相关问题