我正在设置一个Kafka Connect分布式模式应用程序,它将是一个Kafka到S3管道 . 我正在使用Kafka 0.10.1.0-1和Kafka Connect 3.1.1-1 . 到目前为止,事情进展顺利,但对于我正在使用的大型系统而言,一个重要的方面需要知道Kafka - > FileSystem管道的偏移信息 . 根据文档, offset.storage.topic
配置将是分布式模式应用程序用于存储偏移量信息的位置 . 考虑到Kafka如何在Kafka中存储消费者抵消,这是有道理的 . 但是,在使用FileStreamSinkConnector进行一些测试后,没有任何内容写入我的 offset.storage.topic
,这是默认值: connect-offsets
.
具体来说,我使用Python Kafka 生产环境 者将数据推送到主题,并使用Kafka Connect和FileStreamSinkConnect将主题中的数据输出到文件 . 这工作和行为与我期望连接器的行为一样 . 此外,当我停止连接器并启动连接器时,应用程序会记住主题中的状态,并且没有数据重复 . 但是,当我转到 offset.storage.topic
以查看存储的元数据偏移量时,主题中没有任何内容 .
这是我使用的命令:
kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic connect-offsets --from-beginning
让这个命令运行一分钟后我收到这条消息:
Processed a total of 0 messages
总结一下,我有两个问题:
-
为什么即使我的分布式应用程序正确保持状态,也不会将偏移元数据写入应该存储的主题?
-
如何访问Kafka Connect分布式模式应用程序的偏移元数据信息?这是我团队的Lambda Architecture实现系统所必需的100% .
谢谢您的帮助 .
3 回答
Liju是正确的,连接偏移用于跟踪源连接器(具有 生产环境 者但不具有消费者)的偏移 . 接收器具有消费者和跟踪通常方式的偏移 - __consumer_offsets主题
查看上次提交的抵消的最佳方法是使用消费者群组工具:
bin / kafka-consumer-groups.sh --group connect-elastic-login-connector --bootstrap-server localhost:9092 --describe
组名称始终为“connect-”和连接器名称(在我的示例中为elastic-login-connector) . 这将显示该组提交的最新偏移量,该偏移量基本上承认直到此偏移量的所有消息都写入Elastic .
偏移可能是提交给kafka默认偏移提交主题,即_consumer_offsets
由Confluent发布的新的S3 Connector可能对您感兴趣 .
根据您的描述,也许它可以显着简化您将记录从Kafka导出到S3存储桶的目标 .