我有一个 KStream 来自主题 to1 的数据,如下所示:
T1-KEY -> {T1}
T2-KEY -> {T2}
和 KTable ,构造如下:
我正在使用org.apache.kafka.streams.StreamsBuilder从某个主题 to2 创建 KTable ,如下所示:
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"},
{"B2", "Rel": "T1"}
]
}
..
然后,流被平面映射并按Key s.t分组 . 结果 KTable 看起来像这样:
T1 -> { ["B1", "B2"] }
稍后,现在主题 to2 中出现以下消息:
A1-KEY -> { "A1", "Set": [
{"B2", "Rel": "T1"}
]
}
现在我希望我的 KTable 能够反映出这些变化,看起来像这样:
T1 -> { ["B2"] }
但它看起来像这样:
T1 -> { ["B1", "B2"] }
我注意到,在我的 Aggregator<Tx-KEY, Bx, Set<Bx>>
中给出的最后一个参数是set ["B1", "B2"]
,即使我在聚合之前偷看我只得到一个匹配 "B2"
.
我是否理解汇总错误或此处发生了什么?
EDIT
我认为我缩小了它:显然聚合的 Initializer
仅在第一次被调用 - 之后聚合总是接收 last aggregate
作为最后一个参数,例如
@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {
}
其中 Set<Bx> aggregate
在第一次调用时是 []
(通过初始化程序创建),而 ["B1", "B2"]
用于第二次调用 .
有任何想法吗?
EDIT 2
public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {
@Override
public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
aggregate.add(value);
return aggregate;
}
}
EDIT 3
我不能只是平面 Map ,因为我必须组合多个Ax元素,例如
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
},
A2-KEY -> { "A2", "Set": [
{"B2", "Rel": "T1"}
]
},
...
在那里我期待一些小组的喜欢
T1 -> { ["B1", "B2"] }
并在下一次迭代中,当消息
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
}
到达我想到的
T1 -> { ["B1"] }
..
1 回答
请注意,在聚合器中,您只是将元素添加到聚合集中 . 有了这个逻辑,你的集合(对于给定的密钥)永远不会缩小 . 在这种情况下,我认为你已经过多地压扁了流 . 我建议你不要将它压扁到你的消息的形式
(Tx-KEY key, Bx value)
,而是让它们始终保留它们的设置形式:(Tx-KEY key, Set<Bx> value)
. 根本不需要聚合 . 为了达到这个目的,我建议你改变输入集成
通过在KStream flatmap方法调用中使用标准java代码(Collections或Streams api)按"Rel"字段进行分组,这样您就只能在KStream上发送带有
Set<Bx>
-typed值的消息,而不是单独发送Bx
-typed值 .如果您提供当前flatmap实现的代码,很高兴详细说明 .