首页 文章

即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败

提问于
浏览
1

我正在尝试使用kafka connect hdfs sink connector将json数据从kafka移动到hdfs .

即使kafka中的json数据具有架构和有效负载,kafka connect任务也会因错误而失败

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields.

Data in Kafka:

./bin/kafka-console-consumer --topic test_hdfs_json_schema_payload_1 --zookeeper localhost:2181 - 从头开始

{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "deepak","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "sufi","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "vikas","company": "BT"}}

submitted HDFS sink job using below command:

curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-cluster-15may-308pm", "config": {"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"2", "hdfs.url": "hdfs://localhost:9000","flush.size": "1","topics":"test_hdfs_json_schema_payload_1","topics.dir":"/deepak/kafka-connect/trial1"}}' http://localhost:8083/connectors

distributed kafka connect worker configuration:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

error message:

http://localhost:8083/connectors/connect-cluster-15may-308pm/tasks/0/status

{
    "state": "FAILED",
    "trace": "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n",
    "id": 0,
    "worker_id": "127.0.0.1:8083"
}

1 回答

  • 3

    您使用的是哪种版本的Kafka Connect?当从堆栈跟踪工作以确定错误的来源时,它有助于了解这一点 .

    我认为发生的事情是你在数值中有数据,但在键中却没有数据 . 由于 key.convertervalue.converter 都设置为 JsonConverter 并且 schemas.enable=true ,因此期望看到包含 schemapayload 的信封格式 . 但是,我猜你的钥匙都是 null .

    这是一个相反的问题,如https://issues.apache.org/jira/browse/KAFKA-3832,其中 JsonConverter 永远不会生成真正的 null 值 . 相反,它始终生成包含预期可选模式的信封 null payload . 在这种情况下,从Kafka转换为Connect 's data API isn' t工作,因为它期望键中的信封格式相同 .

    您可以通过向控制台使用者命令添加 --property print.key=true 来验证这是问题所在 . 如果它打印出 null 键,问题是JsonConverter无法解码它们 .

    一个简单的解决方法是只使用其他 Converter 来表示不关心 null 值的键 - 无论如何键中都没有数据 . 与Kafka Connect一起发布的是 org.apache.kafka.connect.storage.StringConverter .

相关问题