首页 文章

重新启动Kafka Connect S3 Sink任务失去位置,完全重写所有内容

提问于
浏览
4

重新启动Kafka Connect S3接收器任务后,它重新开始从主题开头一直写入并写入旧记录的重复副本 . 换句话说,Kafka Connect似乎失去了它的位置 .

所以,我想Kafka Connect将当前偏移位置信息存储在内部 connect-offsets 主题中 . 那个话题是空的,我认为这是问题的一部分 .

另外两个内部主题 connect-statusesconnect-configs 不为空 . connect-statuses 有52个条目 . connect-configs 有6个条目;我配置的两个接收器连接器分别为三个: connector-<name>task-<name>-0commit-<name> .

在运行此文档之前,我手动创建了文档中指定的内部Kafka Connect主题:

/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact

我可以验证 connect-offsets 主题似乎是正确创建的:

/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets   PartitionCount:50   ReplicationFactor:3 Configs:cleanup.policy=compact
    Topic: connect-offsets  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3
    Topic: connect-offsets  Partition: 1    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1
    Topic: connect-offsets  Partition: 2    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2
  <snip>

这是运行Kafka 10.2.1的运行Confluent Platform v3.2.1的三服务器集群 .

connect-offsets 应该是空的吗?为什么在重新启动任务时Kafka Connect会在主题开头重新启动?

UPDATE :回应Randall Hauch的回答 .

  • 有关源连接器偏移与接收器连接器偏移的说明解释为空 connect-offsets . 谢谢你的解释!

  • 我绝对不会改变连接器名称 .

  • 如果连接器关闭了大约五天并且之后重新启动,是否有任何原因导致连接器偏移位置到期并重置?我看 __consumer_offsetscleanup.policy=compact

  • auto.offset.reset 只应在 __consumer_offsets 中没有位置时生效,对吗?

我主要使用系统默认设置 . 我的Sink配置JSON如下 . 我使用一个非常简单的自定义分区程序在Avro日期时间字段而不是挂钟时间上进行分区 . 该功能似乎已在Confluent v3.2.2中添加,因此我不需要该功能的自定义插件 . 我希望跳过Confluent v3.2.2并在可用时直接进入v3.3.0 .

{
  "name": "my-s3-sink",

  "tasks.max": 1,
  "topics": "my-topic",
  "flush.size": 10000,

  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
  "partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",

  "s3.bucket.name": "my-bucket",
  "s3.region": "us-west-2",

  "partition.field.name": "timestamp",

  "locale": "us",
  "timezone": "UTC",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",

  "schema.compatibility": "NONE",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://localhost:8081"
}

2 回答

  • 6

    Kafka消费者的默认偏移保留期为24小时(1440分钟) . 如果您停止连接器并因此超过24小时不进行新提交,则您的偏移将过期,并且当您重新启动时,您将重新开始作为新的使用者 . 您可以使用 offsets.retention.minutes 参数修改 __consumer_offsets 主题上的保留期

  • 6

    Kafka Connect使用 connect-offsets 主题(或您命名的任何主题)来存储源连接器的偏移量,但使用普通的Kafka使用者组机制存储接收器连接器偏移量 .

    连接器可能重新启动的一个原因是连接器名称是否更改 . 连接器名称用于定义使用者组的名称,因此如果更改连接器的名称,则在重新启动时,连接器将使用不同的使用者组,其使用者将从头开始 .

    另一个原因可能是Kafka Connect使用者被配置为每次从头开始,通过 consumer.auto.offset.reset=earliest .

    S3连接器版本3.3.0(即将推出)修复了几个问题,其中一些问题会影响轮换时间或架构的工作方式 . 您没有提供配置,因此很难说这些是否会导致您看到的行为 .

相关问题