首页 文章

kafka stream to ktable join

提问于
浏览
0

我想加入一个

  • KStream:从主题创建,主题具有JSON值 . 我使用值中的两个属性重新键入流 . 示例值(json的片段) . 我创建了一个自定义pojo类并使用自定义serdes . {"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"}

键映射为:

map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value))

我查看了KStream并打印了我使用的键和属性 . 看起来都很好 .

  • KTable:我从一个主题创建一个ktable,我使用python脚本写主题,主题的关键是 KYZ1ifHCInOctets ,设备名称和指标名称的组合(从上面) . 我做了一个toStream,然后看看结果流 . 键和值都很好 .

现在,当我进行内部联接并查看主题时,我看到键和值不匹配 . 加入似乎不起作用,

KStream<String, MyPojoClass> joined= datastream.join(table, 
          (data,table)->data
          ,Joined.with(Serdes.String(),myCustomSerde,Serdes.String())
          );

key = XYZ1s1_TotalDiscards
Value = {"deviceName":"ABC2", "indicatorName":"jnxCosQstatTxedBytes"}

我有完全相同的事情通过ksql,但想做我自己的流应用程序 .

1 回答

  • 0

    现在听起来错误是多么愚蠢,我的PoJo类几乎没有静态属性:-(导致错误的键 .

相关问题