我对kafka(也是英语......)很新,我面对这个问题,不能谷歌任何解决方案 .
我使用spring-boot,spring-kafka支持,我在本地机器上安装了kafka_2.11-0.10.1.1(只有一个代理0)
s1.然后我创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic tracking
我的消费者配置:applitions.properties:
kafka.servers.bootstrap=localhost:9092
kafka.topic.tracking=tracking
kafka.group.id=trackingGroup
kafka.client.id=client-1
S2 . 然后我通过更改'kafka.client.id'启动3个消费者并运行spring-boot主类 . 在eclipse控制台上,我可以检查分区分配:
client-1: partitions assigned:[tracking-4, tracking-3]
client-2: partitions assigned:[tracking-2, tracking-1]
client-3: partitions assigned:[tracking-0]
S3 . 启动pruducer向主题发送20条消息,每条消息开始消耗特定分区的消息
S4 . 我关闭了消耗1,kafka自动进行重新 balancer ,新分区分配:
client-1: partitions assigned:[]
client-2: partitions assigned:[tracking-2,tracking-1, tracking-0]
client-3: partitions assigned:[tracking-4,tracking-3]
S5 . 我发现分区'tracking-3'上的消息没有消耗!!
问题可以每次都重现,在新分配的分区中丢失一些消息,你能不能提出任何建议?请帮帮我,谢谢
1 回答
我复制了它;它看起来像kafka本身(使用
auto.comit.enabled=true
)在重新 balancer 上的问题,kafka报告未读分区的"position"(the offset of the <i>next record</i> that will be fetched (if a record with that offset exists)
)作为分区的结尾 .事实上,当我使用kafka-consumer-groups工具时,未读分区的偏移量已经处于“结束”状态 . 当我只用一个消费者运行它时,当它正在读取第一个分区时,我看到......
请注意CURRENT_OFFSET列 .
在下一次运行中,我运行了两次,一次是在处理第一个分区时,稍后再运行一次......
和
看看分区2的当前偏移量从44下降到41 .
禁用自动提交为我解决了...
...
这是我的测试程序:
有 property
我也看到了与0.10.2.0相同的结果 .
EDIT
事实证明这是一个 Spring 天的 Kafka 虫;它适用于启用自动提交,但您必须明确启用它
否则容器假设它是
false
并导致上述奇怪的行为 - 如果启用了自动提交,则看起来像客户端没有_3028747的提交方法 . #288 .我通常建议设置为false,然后选择容器的
AckMode
之一;例如RECORD
在记录收到之后提交,BATCH
在轮询收到的每个批次之后(默认) .