我是kafka和schema注册表的新手 . 我正在尝试为kafka主题强制执行json架构 . 因此,每当生成器生成json消息并将其推送到kafka主题时,它应该仅在符合模式的情况下才会通过 . 我使用的是avro架构格式 .
我已经使用汇合平台安装了模式注册表 . 我跟着这个 https://github.com/confluentinc/schema-registry
我有kaka主题“my_topic”,目前由 生产环境 者和消费者使用 . 我想确保在生成消息时维护架构 .
如何将托管在寄存器中的架构附加到kafka主题“my_topic”,以便在进入kafka之前对连接消息进行过滤?
schema-registry.properties
文件中是否需要任何配置?
1 回答
注意:引用Java API
您的制作人应该使用Avro Serializer,而不是JSON序列化程序 . 该序列化程序只需要
ProducerRecord
类型中基于Avro的类,因此如果您使用Avro序列化程序发送JSON的字符串表示形式,那么Avro生成的模式实际上只是"string"
,而不是您期望的任何AVSC格式 .如果您尝试验证JSON消息,如
kafka-avro-console-consumer
会这样做,您需要将JSON解析为GenericRecord或SpecificRecord,然后您可以找到架构是否不符合预期 .参考:source code