首页 文章

Apache Kafka Avro 反序列化:无法反序列化或解码特定类型的消息。

提问于
浏览
2

我正在尝试使用 Avro Serialize 和 Apache kafka 进行 serialize/deserialize 消息。我创建了一个生成器,用于序列化特定类型的消息并将其发送到队列。当消息成功发送到队列时,我们的消费者选择消息并尝试处理,但在尝试时我们面临异常,对于特定对象的大小写字节。例外情况如下:

[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer
    at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.lambda$infiniteConsumer$0(AvroSpecificDeserializer.java:51)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:46)
    at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:63)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

根据例外情况,我们使用一些不方便的方式来读取数据,下面是我们的代码:

卡夫卡制片人代码:

static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1002, "Jimmy");

        Parser parser = new Parser();
        Schema schema = parser.parse(AvroSpecificProducer.class
                .getClassLoader().getResourceAsStream("avro/customer.avsc"));

        SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
        try(ByteArrayOutputStream os = new ByteArrayOutputStream()) {
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
            writer.write(customer1, encoder);
            encoder.flush();

            byte[] avroBytes = os.toByteArray();

            ProducerRecord<String, byte[]> record1 = new ProducerRecord<>("CustomerSpecificCountry",
                    "Customer One 11 ", avroBytes
            );

            asyncSend(record1);
        }

        Thread.sleep(10000);
    }

卡夫卡消费者代码:

static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
    }

    public static void infiniteConsumer() throws IOException {
        try(KafkaConsumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) {
            kafkaConsumer.subscribe(Arrays.asList("CustomerSpecificCountry"));

            while(true) {
                ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(100);
                System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + records.count());

                Schema.Parser parser = new Schema.Parser();
                Schema schema = parser.parse(AvroSpecificDeserializer.class
                        .getClassLoader().getResourceAsStream("avro/customer.avsc"));

                records.forEach(record -> {
                    DatumReader<Customer> customerDatumReader = new SpecificDatumReader<>(schema);
                    BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
                    try {
                        System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
                        Customer customer = customerDatumReader.read(null, binaryDecoder);
                        System.out.println(customer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            }

        }
    }

在控制台中使用消费者,我们成功地能够接收消息。那么将消息解码到我们的 pojo 文件中的方法是什么?

2 回答

  • 0

    这个问题的解决方案是,使用

    DatumReader<GenericRecord> customerDatumReader = new SpecificDatumReader<>(schema);
    

    代替

    `DatumReader<Customer> customerDatumReader = new SpecificDatumReader<>(schema);
    

    究其原因,仍未找到。这可能是因为 Kafka 不了解消息的结构,我们明确定义消息的模式,GenericRecord对于根据模式将任何消息转换为可读的 JSON 格式很有用。创建 JSON 后,我们可以轻松地将其转换为我们的 POJO 类。

    但仍然需要找到直接转换为我们的 POJO 类的解决方案。

  • 0

    在将值传递给ProduceRecord之前,您无需显式执行 Avro 序列化。序列化器将为您完成。您的代码如下所示:

    Customer customer1 = new Customer(1002, "Jimmy");
    ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry", customer1);
        asyncSend(record1);
    }
    

    查看来自 Confluent 的示例使用 avro 的简单制作人

相关问题