重新启动Kafka Connect S3接收器任务后,它重新开始从主题开头一直写入并写入旧记录的重复副本 . 换句话说,Kafka Connect似乎失去了它的位置 .
所以,我想Kafka Connect将当前偏移位置信息存储在内部 connect-offsets
主题中 . 那个话题是空的,我认为这是问题的一部分 .
另外两个内部主题 connect-statuses
和 connect-configs
不为空 . connect-statuses
有52个条目 . connect-configs
有6个条目;我配置的两个接收器连接器分别为三个: connector-<name>
, task-<name>-0
, commit-<name>
.
在运行此文档之前,我手动创建了文档中指定的内部Kafka Connect主题:
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact
我可以验证 connect-offsets
主题似乎是正确创建的:
/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic: connect-offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: connect-offsets Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: connect-offsets Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
<snip>
这是运行Kafka 10.2.1的运行Confluent Platform v3.2.1的三服务器集群 .
connect-offsets
应该是空的吗?为什么在重新启动任务时Kafka Connect会在主题开头重新启动?
UPDATE :回应Randall Hauch的回答 .
-
有关源连接器偏移与接收器连接器偏移的说明解释为空
connect-offsets
. 谢谢你的解释! -
我绝对不会改变连接器名称 .
-
如果连接器关闭了大约五天并且之后重新启动,是否有任何原因导致连接器偏移位置到期并重置?我看
__consumer_offsets
有cleanup.policy=compact
-
auto.offset.reset
只应在__consumer_offsets
中没有位置时生效,对吗?
我主要使用系统默认设置 . 我的Sink配置JSON如下 . 我使用一个非常简单的自定义分区程序在Avro日期时间字段而不是挂钟时间上进行分区 . 该功能似乎已在Confluent v3.2.2中添加,因此我不需要该功能的自定义插件 . 我希望跳过Confluent v3.2.2并在可用时直接进入v3.3.0 .
{
"name": "my-s3-sink",
"tasks.max": 1,
"topics": "my-topic",
"flush.size": 10000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",
"s3.bucket.name": "my-bucket",
"s3.region": "us-west-2",
"partition.field.name": "timestamp",
"locale": "us",
"timezone": "UTC",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"schema.compatibility": "NONE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}
2 回答
Kafka消费者的默认偏移保留期为24小时(1440分钟) . 如果您停止连接器并因此超过24小时不进行新提交,则您的偏移将过期,并且当您重新启动时,您将重新开始作为新的使用者 . 您可以使用
offsets.retention.minutes
参数修改__consumer_offsets
主题上的保留期Kafka Connect使用
connect-offsets
主题(或您命名的任何主题)来存储源连接器的偏移量,但使用普通的Kafka使用者组机制存储接收器连接器偏移量 .连接器可能重新启动的一个原因是连接器名称是否更改 . 连接器名称用于定义使用者组的名称,因此如果更改连接器的名称,则在重新启动时,连接器将使用不同的使用者组,其使用者将从头开始 .
另一个原因可能是Kafka Connect使用者被配置为每次从头开始,通过
consumer.auto.offset.reset=earliest
.S3连接器版本3.3.0(即将推出)修复了几个问题,其中一些问题会影响轮换时间或架构的工作方式 . 您没有提供配置,因此很难说这些是否会导致您看到的行为 .