首页 文章

Kafka Connect offset.storage.topic没有接收消息(即如何访问Kafka Connect偏移元数据?)

提问于
浏览
1

我正在设置一个Kafka Connect分布式模式应用程序,它将是一个Kafka到S3管道 . 我正在使用Kafka 0.10.1.0-1和Kafka Connect 3.1.1-1 . 到目前为止,事情进展顺利,但对于我正在使用的大型系统而言,一个重要的方面需要知道Kafka - > FileSystem管道的偏移信息 . 根据文档, offset.storage.topic 配置将是分布式模式应用程序用于存储偏移量信息的位置 . 考虑到Kafka如何在Kafka中存储消费者抵消,这是有道理的 . 但是,在使用FileStreamSinkConnector进行一些测试后,没有任何内容写入我的 offset.storage.topic ,这是默认值: connect-offsets .

具体来说,我使用Python Kafka 生产环境 者将数据推送到主题,并使用Kafka Connect和FileStreamSinkConnect将主题中的数据输出到文件 . 这工作和行为与我期望连接器的行为一样 . 此外,当我停止连接器并启动连接器时,应用程序会记住主题中的状态,并且没有数据重复 . 但是,当我转到 offset.storage.topic 以查看存储的元数据偏移量时,主题中没有任何内容 .

这是我使用的命令:

kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic connect-offsets --from-beginning

让这个命令运行一分钟后我收到这条消息:

Processed a total of 0 messages

总结一下,我有两个问题:

  • 为什么即使我的分布式应用程序正确保持状态,也不会将偏移元数据写入应该存储的主题?

  • 如何访问Kafka Connect分布式模式应用程序的偏移元数据信息?这是我团队的Lambda Architecture实现系统所必需的100% .

谢谢您的帮助 .

3 回答

  • 2
    • Liju是正确的,连接偏移用于跟踪源连接器(具有 生产环境 者但不具有消费者)的偏移 . 接收器具有消费者和跟踪通常方式的偏移 - __consumer_offsets主题

    • 查看上次提交的抵消的最佳方法是使用消费者群组工具:

    bin / kafka-consumer-groups.sh --group connect-elastic-login-connector --bootstrap-server localhost:9092 --describe

    组名称始终为“connect-”和连接器名称(在我的示例中为elastic-login-connector) . 这将显示该组提交的最新偏移量,该偏移量基本上承认直到此偏移量的所有消息都写入Elastic .

  • 1

    偏移可能是提交给kafka默认偏移提交主题,即_consumer_offsets

  • 0

    由Confluent发布的新的S3 Connector可能对您感兴趣 .

    根据您的描述,也许它可以显着简化您将记录从Kafka导出到S3存储桶的目标 .

相关问题