首页 文章

无法阅读Kafka主题avro消息

提问于
浏览
1

Debezium连接器的Kafka连接事件是Avro编码的 .

在传递给Kafka connect独立服务的connect-standalone.properties中提到了以下内容 .

key.converter=io.confluent.connect.avro.AvroConverter
value.confluent=io.confluent.connect.avro.AvroConverter
internal.key.converter=io.confluent.connect.avro.AvroConverter
internal.value.converter=io.confluent.connect.avro.AvroConverter
schema.registry.url=http://ip_address:8081
internal.key.converter.schema.registry.url=http://ip_address:8081
internal.value.converter.schema.registry.url=http://ip_address:8081

使用以下属性配置Kafka使用者代码:

Properties props = new Properties();
props.put("bootstrap.servers", "ip_address:9092");
props.put("zookeeper.connect", "ip_address:2181");
props.put("group.id", "test-consumer-group");
props.put("auto.offset.reset","smallest");
//Setting auto comit to false to ensure that on processing failure we retry the read
props.put("auto.commit.offset", "false");
props.put("key.converter.schema.registry.url", "ip_address:8081");
props.put("value.converter.schema.registry.url", "ip_address:8081");
props.put("schema.registry.url", "ip_address:8081");

在使用者实现中,以下是读取键和值组件的代码 . 我使用REST从Schema Registry获取密钥和值的模式 .

GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(byteData, null));

解析密钥工作正常 . 解析消息的值部分时,我得到ArrayIndexOutOfBoundsException .

下载了Avro的源代码并进行了调试 . 发现GenericDatumReader.readInt方法返回负值 . 该值应该是数组(符号)的索引,因此应该是正数 .

尝试使用kafka-avro-standalone-consumer消费事件,但它也抛出了一个ArrayIndexOutOfBoundsException . 所以,我的猜测是消息在Kafka connect( 生产环境 者)中编码不正确,问题在于配置 .

以下是问题:

  • 生产环境 者或消费者传递的配置有什么问题吗?

  • 为什么密钥反序列化工作但不是有 Value 的?

  • 还有什么需要做的工作吗? (比如在某处指定字符编码) .

  • 可以将Avro的Debezium用于 生产环境 ,还是现在的实验性功能? Debezium Avro上的帖子明确表示将来会包含涉及Avro的示例 .

有很多帖子,Avro反序列化引发了ArrayIndexOutOfBoundsException,但无法将其与我面临的问题联系起来 .

1 回答

  • 1

    按照_2540740中的步骤进行操作,现在工作正常 .

相关问题