首页 文章

使用Avro的单个Kafka主题中的多个消息类型

提问于
浏览
1

我有一个基于Kafka的事件源应用程序 . 目前我有一个主题,其中包含多种消息类型 . 全部使用JSON进行序列化/反序列化 .

融合的模式注册表看起来像是一种很好的消息类型维护方法,并且在Avro完全兼容模式下,它还提供了一种在我的事件源应用程序中发送消息版本控制的机制 .

最近patch - blog post到4.1.1汇合 . 使用Avro序列化程序/反序列化程序,您可以在一个主题中拥有多种不同类型的消息 .

但是,我还没有看到任何这方面的实例 . 甚至没有一个 .

我的问题是:上述补丁是否真的有效,而不必使用Avro联盟类型(将所有不同类型的消息放在一个单一的模式中并利用联合)?

这种方法如何与Kafka Streaming应用程序一起使用,您需要在其中指定Key和Value Serde?

我应该忘记Avro而只是选择protobuff吗?

1 回答

  • 0

    这是消费者从主题中获取数据的示例,其中发布了不同类型的事件:

    package com.kafka.schema;
    
    import com.phonebook.Employee;
    import com.phonebook.Milestone;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
    import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.LongDeserializer;
    
    import java.time.Duration;
    import java.time.temporal.ChronoUnit;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.stream.IntStream;
    
    public class AvroConsumer {
    
        private static Consumer<Long, GenericRecord> createConsumer() {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Const.BOOTSTRAP_SERVERS);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
            // Use Kafka Avro Deserializer.
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
            // Use Specific Record or else you get Avro GenericRecord.
            // props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
    
            // Schema registry location.
            // Run Schema Registry on 8081
            props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, Const.SCHEMA_REGISTRY);
            props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
            return new KafkaConsumer<>(props);
        }
    
        public static void main(String... args) {
            final Consumer<Long, GenericRecord> consumer = createConsumer();
            consumer.subscribe(Collections.singletonList(Const.TOPIC));
            IntStream.range(1, 100).forEach(index -> {
                final ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
                if (records.count() == 0) {
                    System.out.println("None found");
                } else {
                    records.forEach(record -> {
                        GenericRecord recValue = record.value();
                        System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), recValue);
                    });
                }
            });
        }
    }
    

    这里的重要部分是:

    props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
    

相关问题