我们有一个微服务,使用spring-boot和spring-cloud-stream生成和使用来自Kafka的消息 .
版本:
spring-boot:1.5.8.RELEASE
spring-cloud-stream:Ditmars.RELEASE
Kafka服务器:kafka_2.11-1.0.0
EDIT: 我们正在使用包含3个Kafka节点的StatefulSets群集和3个Zookeeper节点的群集在Kubernetes环境中工作 .
我们经历了几次旧消息的发生,这些消息在几天前已经处理过的消息中被重新处理 .
几点说明:
- 在此之前,打印了以下日志(有更多类似的行,这只是一个摘要)
撤销以前为群组注册服务分配的分区[]已发现的协调员dev-kafka-1.kube1.iaas.watercorp.com:9092(id:2147483646 rack:null)已成功加入群组注册服务与第320代
-
上面提到的撤销和重新分配分区的事件每隔几个小时就会发生一次 . 而在这些事件中,很少有旧消息被重新消耗 . 在大多数情况下,重新分配不会触发消息消耗 .
-
消息来自不同的分区 .
-
每个 partition 正在重新处理的消息超过1条 .
application.yml:
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka
defaultBrokerPort: 9092
zkNodes: zookeeper
defaultZkPort: 2181
minPartitionCount: 2
replicationFactor: 1
autoCreateTopics: true
autoAddPartitions: true
headers: type,message_id
requiredAcks: 1
configuration:
"[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
bindings:
user-enrollment-input:
consumer:
autoRebalanceEnabled: true
autoCommitOnError: true
enableDlq: true
user-input:
consumer:
autoRebalanceEnabled: true
autoCommitOnError: true
enableDlq: true
enrollment-mail-output:
producer:
sync: true
configuration:
retries: 10000
enroll-users-output:
producer:
sync: true
configuration:
retries: 10000
default:
binder: kafka
contentType: application/json
group: enrollment-service
consumer:
maxAttempts: 1
producer:
partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor
bindings:
user-enrollment-input:
destination: enroll-users
consumer:
concurrency: 10
partitioned: true
user-input:
destination: user
consumer:
concurrency: 5
partitioned: true
enrollment-mail-output:
destination: send-enrollment-mail
producer:
partitionCount: 10
enroll-users-output:
destination: enroll-users
producer:
partitionCount: 10
有没有我可能遗失的配置?什么可以导致这种行为?
1 回答
因此,实际问题是以下故障单中描述的问题:https://issues.apache.org/jira/browse/KAFKA-3806 . 使用建议的解决方法修复它 .