首页 文章

数据库关闭时Kafka JDBC Sink Connector的最大重试次数和重试间隔

提问于
浏览
1

我试图在数据库关闭时测试和评估Kafka JDBC Sink连接器的行为 .

在数据库关闭时在Kafka中收到新消息时,会报告以下错误:

INFO Unable to connect to database on attempt 1/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider:91)
com.microsoft.sqlserver.jdbc.SQLServerException: Unable to access availability database 'Giorgos' because the database replica is not in the PRIMARY or SECONDARY role. Connections to an availability database is permitted only when the database replica is in the PRIMARY or SECONDARY role. Try the operation again later.

在重试之后,将报告以下错误并且任务将被终止:

ERROR WorkerSinkTask{id=sink-giorgos_test-2} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)

在哪里可以修改退出次数和重试间隔,根据第一个错误设置为 10000 ms?

说我希望 Worker 继续尝试连接数据库5分钟 . 我应该配置哪些参数?

EDIT to include required files:

sink-file.properties

name=sink-test
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=GIORGOS.TOPIC
connection.url=jdbc:sqlserver://ip:port;DatabaseName=Streaming;user=myuser;password=mypass
auto.create=true

# DB failover
max.retries=10
retry.backoff.ms=10000

pk.mode=record_value
pk.fields=ID
insert.mode=upsert
transforms=ExtractField
transforms.ExtractField.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractField.field=data

worker.properties(我在分布式模式下运行时有多个文件)

bootstrap.servers=localhost:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081



config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1



internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

rest.port=8040
rest.advertised.port=8040

plugin.path=/usr/share/java

1 回答

相关问题