首页 文章

如何将模式附加到kakfa主题以过滤json消息?

提问于
浏览
0

我是kafka和schema注册表的新手 . 我正在尝试为kafka主题强制执行json架构 . 因此,每当生成器生成json消息并将其推送到kafka主题时,它应该仅在符合模式的情况下才会通过 . 我使用的是avro架构格式 .

我已经使用汇合平台安装了模式注册表 . 我跟着这个 https://github.com/confluentinc/schema-registry

我有kaka主题“my_topic”,目前由 生产环境 者和消费者使用 . 我想确保在生成消息时维护架构 .

如何将托管在寄存器中的架构附加到kafka主题“my_topic”,以便在进入kafka之前对连接消息进行过滤?

schema-registry.properties 文件中是否需要任何配置?

1 回答

  • 0

    注意:引用Java API

    您的制作人应该使用Avro Serializer,而不是JSON序列化程序 . 该序列化程序只需要 ProducerRecord 类型中基于Avro的类,因此如果您使用Avro序列化程序发送JSON的字符串表示形式,那么Avro生成的模式实际上只是 "string" ,而不是您期望的任何AVSC格式 .

    如果您尝试验证JSON消息,如 kafka-avro-console-consumer 会这样做,您需要将JSON解析为GenericRecord或SpecificRecord,然后您可以找到架构是否不符合预期 .

    参考:source code

    String line = reader.readLine(); // Read line of JSON
      if (line == null) {
        return null;
      }
      if (!parseKey) {
        Object value = jsonToAvro(line, valueSchema); // Try to convert using Avro schema
        byte[] serializedValue = serializeImpl(valueSubject, value); // Get the bytes
        return new ProducerRecord<>(topic, serializedValue); // Make the record
      }
    

相关问题