我想加入一个
- KStream:从主题创建,主题具有JSON值 . 我使用值中的两个属性重新键入流 . 示例值(json的片段) . 我创建了一个自定义pojo类并使用自定义serdes .
{"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"}
键映射为:
map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value))
我查看了KStream并打印了我使用的键和属性 . 看起来都很好 .
- KTable:我从一个主题创建一个ktable,我使用python脚本写主题,主题的关键是
KYZ1ifHCInOctets
,设备名称和指标名称的组合(从上面) . 我做了一个toStream,然后看看结果流 . 键和值都很好 .
现在,当我进行内部联接并查看主题时,我看到键和值不匹配 . 加入似乎不起作用,
KStream<String, MyPojoClass> joined= datastream.join(table,
(data,table)->data
,Joined.with(Serdes.String(),myCustomSerde,Serdes.String())
);
key = XYZ1s1_TotalDiscards
Value = {"deviceName":"ABC2", "indicatorName":"jnxCosQstatTxedBytes"}
我有完全相同的事情通过ksql,但想做我自己的流应用程序 .
1 回答
现在听起来错误是多么愚蠢,我的PoJo类几乎没有静态属性:-(导致错误的键 .