如何获取,设置或重置Kafka Connect连接器/任务/接收器的偏移量?
我可以使用运行 kafka.admin.ConsumerGroupCommand
的 /usr/bin/kafka-consumer-groups
工具来查看我所有常规Kafka消费者群体的偏移量 . 但是,Kafka Connect任务和组不会显示此工具 .
同样,我可以使用zookeeper-shell连接到Zookeeper,我可以看到常规Kafka消费者组的zookeeper条目,但不能查看Kafka Connect接收器的条目 .
3 回答
您无法设置偏移量,但可以使用
kafka-consumer-groups.sh
工具来预先输入.414304_ .连接器的使用者组名称为
connect-*CONNECTOR NAME*
,但您可以仔细检查:unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --list
要查看当前偏移量:
unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --group connect-*CONNECTOR NAME* --describe
要向前移动偏移量:
unset JMX_PORT; ./bin/kafka-console-consumer.sh --bootstrap-server *KAFKA HOSTS* --topic *TOPIC* --max-messages 10000 --consumer-property group.id=connect-*CONNECTOR NAME* > /dev/null
我想你可以通过使用
--delete
标志首先删除使用者组来向后移动偏移量 .不要忘记通过Kafka Connect REST API暂停和恢复连接器 .
从0.10.0.0开始,Connect确实是我们未来想要改进的东西,但还没有 .
ConsumerGroupCommand
将是管理Sink连接器偏移的正确工具 . 请注意,源连接器偏移存储在Connect的特殊偏移主题中(它们不像普通的Kafka偏移,因为它们是由源系统定义的,请参见worker configuration docs中的offset.storage.topic
),并且由于sink Connectors使用新的消费者,因此他们赢了将它们的偏移量存储在Zookeeper中 - 所有现代客户端都使用基于Kafka的本地偏移存储 .ConsumerGroupCommand
可以使用这些偏移量,您只需要传递--new-consumer
选项) .在我的情况下(测试将文件读入 生产环境 者并在控制台中使用,仅在本地),我只是在 生产环境 者输出中看到了这个:
所以我想打开它,但它是二进制的,有一些难以识别的字符 .
我删除了它(重命名它也有效),然后我可以写入同一个文件并再次从消费者那里获取文件内容 . You have to restart the console producer to take effect because it attempts to read the offset file, if not there, create a new one, so that the offset is reset.
如果要重置它而不删除,可以使用:
您可以通过以下方式检查所有组名:
并检查每组的详细信息:
在 生产环境 环境中,此偏移量由zookeeper管理,因此需要更多步骤(和警告) . 你可以参考这个页面:
https://metabroadcast.com/blog/resetting-kafka-offsets https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html
脚步: