首页 文章

Kafka Connect Offsets . get / set方法?

提问于
浏览
8

如何获取,设置或重置Kafka Connect连接器/任务/接收器的偏移量?

我可以使用运行 kafka.admin.ConsumerGroupCommand/usr/bin/kafka-consumer-groups 工具来查看我所有常规Kafka消费者群体的偏移量 . 但是,Kafka Connect任务和组不会显示此工具 .

同样,我可以使用zookeeper-shell连接到Zookeeper,我可以看到常规Kafka消费者组的zookeeper条目,但不能查看Kafka Connect接收器的条目 .

3 回答

  • 0

    您无法设置偏移量,但可以使用 kafka-consumer-groups.sh 工具来预先输入.414304_ .

    连接器的使用者组名称为 connect-*CONNECTOR NAME* ,但您可以仔细检查: unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --list

    要查看当前偏移量: unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --group connect-*CONNECTOR NAME* --describe

    要向前移动偏移量: unset JMX_PORT; ./bin/kafka-console-consumer.sh --bootstrap-server *KAFKA HOSTS* --topic *TOPIC* --max-messages 10000 --consumer-property group.id=connect-*CONNECTOR NAME* > /dev/null

    我想你可以通过使用 --delete 标志首先删除使用者组来向后移动偏移量 .

    不要忘记通过Kafka Connect REST API暂停和恢复连接器 .

  • 3

    从0.10.0.0开始,Connect确实是我们未来想要改进的东西,但还没有 . ConsumerGroupCommand 将是管理Sink连接器偏移的正确工具 . 请注意,源连接器偏移存储在Connect的特殊偏移主题中(它们不像普通的Kafka偏移,因为它们是由源系统定义的,请参见worker configuration docs中的 offset.storage.topic ),并且由于sink Connectors使用新的消费者,因此他们赢了将它们的偏移量存储在Zookeeper中 - 所有现代客户端都使用基于Kafka的本地偏移存储 . ConsumerGroupCommand 可以使用这些偏移量,您只需要传递 --new-consumer 选项) .

  • 9

    在我的情况下(测试将文件读入 生产环境 者并在控制台中使用,仅在本地),我只是在 生产环境 者输出中看到了这个:

    offset.storage.file.filename=/tmp/connect.offsets
    

    所以我想打开它,但它是二进制的,有一些难以识别的字符 .

    我删除了它(重命名它也有效),然后我可以写入同一个文件并再次从消费者那里获取文件内容 . You have to restart the console producer to take effect because it attempts to read the offset file, if not there, create a new one, so that the offset is reset.

    如果要重置它而不删除,可以使用:

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group-name> --reset-offsets --to-earliest --topic <topic_name>
    

    您可以通过以下方式检查所有组名:

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
    

    并检查每组的详细信息:

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group_name> --describe
    

    在 生产环境 环境中,此偏移量由zookeeper管理,因此需要更多步骤(和警告) . 你可以参考这个页面:

    https://metabroadcast.com/blog/resetting-kafka-offsets https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html

    脚步:

    kafka-topics --list --zookeeper localhost:2181
    kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1 // -1 for largest, -2 for smallest
    
    set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId} {newOffset}
    

相关问题