首页 文章

Kafka Streams - 与旧州的聚合

提问于
浏览
1

我有一个 KStream 来自主题 to1 的数据,如下所示:

T1-KEY -> {T1}
T2-KEY -> {T2}

KTable ,构造如下:

我正在使用org.apache.kafka.streams.StreamsBuilder从某个主题 to2 创建 KTable ,如下所示:

A1-KEY -> { "A1", "Set": [
                          {"B1", "Rel": "T1"},
                          {"B2", "Rel": "T1"}
                         ]
          } 

..

然后,流被平面映射并按Key s.t分组 . 结果 KTable 看起来像这样:

T1 -> { ["B1", "B2"] }

稍后,现在主题 to2 中出现以下消息:

A1-KEY -> { "A1", "Set": [
                          {"B2", "Rel": "T1"}
                         ]
          }

现在我希望我的 KTable 能够反映出这些变化,看起来像这样:

T1 -> { ["B2"] }

但它看起来像这样:

T1 -> { ["B1", "B2"] }

我注意到,在我的 Aggregator<Tx-KEY, Bx, Set<Bx>> 中给出的最后一个参数是set ["B1", "B2"] ,即使我在聚合之前偷看我只得到一个匹配 "B2" .

我是否理解汇总错误或此处发生了什么?

EDIT

我认为我缩小了它:显然聚合的 Initializer 仅在第一次被调用 - 之后聚合总是接收 last aggregate 作为最后一个参数,例如

@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {

}

其中 Set<Bx> aggregate 在第一次调用时是 [] (通过初始化程序创建),而 ["B1", "B2"] 用于第二次调用 .

有任何想法吗?

EDIT 2

public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {

    @Override
    public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
        aggregate.add(value);
        return aggregate;
    }
}

EDIT 3

我不能只是平面 Map ,因为我必须组合多个Ax元素,例如

A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          },
A2-KEY -> { "A2", "Set": [
                      {"B2", "Rel": "T1"}
                     ]
          },
...

在那里我期待一些小组的喜欢

T1 -> { ["B1", "B2"] }

并在下一次迭代中,当消息

A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          }

到达我想到的

T1 -> { ["B1"] }

..

1 回答

  • 1

    请注意,在聚合器中,您只是将元素添加到聚合集中 . 有了这个逻辑,你的集合(对于给定的密钥)永远不会缩小 . 在这种情况下,我认为你已经过多地压扁了流 . 我建议你不要将它压扁到你的消息的形式 (Tx-KEY key, Bx value) ,而是让它们始终保留它们的设置形式: (Tx-KEY key, Set<Bx> value) . 根本不需要聚合 . 为了达到这个目的,我建议你改变输入集

    "Set": [
         {"B1", "Rel": "T1"},
         {"B2", "Rel": "T1"}
    ]
    

    T1 -> { ["B1", "B2"] }
    

    通过在KStream flatmap方法调用中使用标准java代码(Collections或Streams api)按"Rel"字段进行分组,这样您就只能在KStream上发送带有 Set<Bx> -typed值的消息,而不是单独发送 Bx -typed值 .

    如果您提供当前flatmap实现的代码,很高兴详细说明 .

相关问题