首页 文章

avro.io.AvroTypeException:数据<avro data>不是模式的示例{...}

提问于
浏览
0

我们正在努力将Apache Storm与Kafka的Confluent框架集成在一起 . 我们正在使用一个名为“Pyleus”的python风暴包装器

我们设置了一个监控数据库表的Confluent-Kafka JDBC连接器,每当DB发生变化时,新记录将以Avro格式发送为Kafka消息 .

在Pyleus bolt中,我们能够获得Kafka消息,但是,我们无法将其反序列化为JSON .

我们使用两个名为“avro_json_serializer”和“avro”的python-Avro模块 . 当我尝试反序列化我放在一起的简单Avro文件时,它们可以工作 .

通过使用HTTP GET从Confluent的架构注册表获取Kafka消息中Avro数据的Avro架构 . 我将Kafka消息中的模式和Avro数据放入两个文件中,这是我的测试程序:

import avro
import avro_json_serializer as ajs

import json

# Avro schema from Confluent's schema registry using HTTP GET
schema_string = open("realAvroSchemaFromKK.avsc").read()

schema_dict = json.loads(schema_string)
avro_schema = avro.schema.make_avsc_object(schema_dict, avro.schema.Names())

serializer = ajs.AvroJsonSerializer(avro_schema)

# Avro data with in Kafka message - I wrote it into this file
avrofile = open("realAvroFromKK.avro", "r")
avro = avrofile.read()

jsonData = serializer.to_json(avro) # where the code error out #

print jsonData

我解释错误消息的方式是我的avro架构不适合我的avro数据:

avro.io.AvroTypeException: The datum �bankbankHoward �����THoward �����T� is not an example of the schema {
  "namespace": "example.avro",
  "type": "record",
  "connect.name": "TABLE_NAME",
  "fields": [
    {
      "type": "int",
      "name": "Column_1"
    },
    ... (omitting the rest of the schema)

我从here读到,来自Confluent框架的Avro格式的Kafka消息在消息开头有4个额外的字节,表示模式ID . 我试图删除Avro数据的前4个字节然后将其发送到"serializer.to_json()"但仍然没有运气 .

救命!

1 回答

  • 0

    我遇到了类似的问题,我正在通过Storm Kafka喷口阅读kafka融合数据 . 这是对我有用的等效Java代码 .

    ByteBuffer input = ByteBuffer.wrap(data);
        int id = input.getInt();
        int start = input.position() + 1;
        MyAvroObject obj = null;
        try {
            obj  = datum_reader.read(null, DecoderFactory.get().binaryDecoder(input.array(), start, input.limit(), null));
    
        } catch (IOException e) {
            e.printStackTrace();
        }
        return obj;
    

    ByteBuffer上的getInt()和position方法在模式Id之后移动指针 . 希望这可以帮助 .

相关问题