首页 文章

汇总的kafka到s3连接失败,出现ERROR线程中的意外异常[KafkaBasedLog工作线程 -

提问于
浏览
1

我在EC2上设置了从kafka读取并写入S3的汇合(4.0)连接器 .

独立尝试很顺利:

bin / connect-standalone etc / standalone / example-connect-worker.properties etc / standalone / example-connect-s3-sink.properties

但是,分布式版本仍然失败

[2018-01-30 21:26:05,860] ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-configs,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:334)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1097)
	at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:256)
	at org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:69)
	at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:327)

我只是想首先使用连接器类等于FileStreamSinkConnector

接收器conf文件如下:

name=local-file-sink
#connector.class=FileStreamSink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=test.sink.txt
topics=tests3

s3.bucket=tests3
s3.prefix=tests3
s3.endpoint=http://localhost:9090
s3.path_style=true
local.buffer.dir=/tmp/connect-system-test

非常感谢!

1 回答

  • 1

    当您使用 ./bin/connect-distributed 启动分布式Connect工作程序时,您可能只通过命令行提供工作程序的属性 .

    要通过将其配置发布到工作线程的REST endpoints 来加载连接器,您可以使用 curl 或等效命令 .

    例如:

    curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors

    其中 config.json 是包含连接器属性的文件 .

    更多信息:https://docs.confluent.io/current/connect/managing.html#distributed-example

相关问题