我有一个使用kafka connect的 生产环境 者,它使用Confluent Kafka Connect API,它以“SourceRecord”格式发布消息,其中包含“schema”和“struct”,如下所示 .
我正在寻找一个示例代码来在scala中构建一个kafka使用者,它消费该消息并将其反序列化为一个对象
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
//publish kafka message in avro format
protected SourceRecord makeSourceRecord(AvroDataEvent avroDataEvent) {
return new SourceRecord(
partitionKey(config.sourceJdbcUrl),
config.topicName,
avroDataEvent.schema(),
avroDataEvent.struct());
}
1 回答
您可以使用Confluent KafkaAvroDeserializer类以及使用Connector配置的架构注册表直接从
config.topicName
主题使用仅仅因为来自Connect的数据不需要使用Connect API来读取它 .
关于示例代码,请尝试将其作为起点(在Kotlin中)http://aseigneurin.github.io/2018/08/03/kafka-tutorial-5-consuming-avro.html