我 Build 了一个Spark-Streaming管道,通过Kafka获取测量数据 . 该数据使用Avro序列化 . 数据可以有两种类型 - EquidistantData
和 DiscreteData
. 我使用 avdl
文件和 sbt-avrohugger
插件创建了这些文件 . 我使用生成从 SpecificRecord
继承的Scala案例类的变体 .
在我的接收应用程序中,我可以通过查询 EquidistantData.SCHEMA$
和 DiscreteData.SCHEMA$
来获取这两个模式 .
现在,我的Kafka流为我提供了值类为 Array[Byte]
的RDD . 到现在为止还挺好 .
如何从字节数组中找出序列化时使用的模式,即是否使用 EquidistantData.SCHEMA$
或 DiscreteData.SCHEMA$
?
我想到在消息密钥中发送适当的信息 . 目前,我不使用消息密钥 . 这是一种可行的方法,还是可以从我收到的序列化字节数组中以某种方式获取模式?
跟进:
另一种可能性是对离散和等距数据使用单独的主题 . 这可行吗?