我正在将 Spark Scala App Kafka API 升级到 v。0.10. 我曾经创建自定义方法来反序列化以字节字符串格式出现的消息。
我已经意识到有一种方法可以将 StringDeserializer 或 ByteArrayDeserializer 作为参数传递给键或值。
但是,我找不到有关如何创建自定义 Avro 架构反序列化器的任何信息,因此我的 kafkaStream 可以在 createDirectStream 和使用 Kafka 的数据时使用它。
可能吗?
我正在将 Spark Scala App Kafka API 升级到 v。0.10. 我曾经创建自定义方法来反序列化以字节字符串格式出现的消息。
我已经意识到有一种方法可以将 StringDeserializer 或 ByteArrayDeserializer 作为参数传递给键或值。
但是,我找不到有关如何创建自定义 Avro 架构反序列化器的任何信息,因此我的 kafkaStream 可以在 createDirectStream 和使用 Kafka 的数据时使用它。
可能吗?
1 回答
有可能的。您需要覆盖
org.apache.kafka.common.serialization
中定义的Deserializer<T>
接口,并且需要通过保存 Kafka 参数的ConsumerStrategy[K, V]
类将key.deserializer
或value.deserializer
指向您的自定义类。例如:然后: