首页 文章

如何反转多对多关系?

提问于
浏览
2

我有一个压缩的Kafka主题是一个实体流,它具有我想要反转的多对多关系中该实体的最新表示 .

一个示例是 Author 对象的主题,其中主题键是 Author.id (AAA),值是“Book”标识符值的数组:

"AAA" -> {"books": [456]}

Author 写入ID为 333 的新 Book 时,具有相同键的新事件将使用更新的书籍列表写入流:

"AAA" -> {"books": [456, 333]}

Book 也可能有多个 Authors ,因此相同的 Book 标识符可能出现在另一个事件中:

"BBB" -> {"books": [333, 555]}

我想使用kafka流将其反转为 Books -> [Author] 的流,因此上述事件将导致类似于:

456 -> {"authors": ["AAA"]}
333 -> {"authors": ["AAA", "BBB"]}
555 -> {"authors": ["BBB"]}

当我再次启动我的应用程序时,我希望恢复状态,这样如果我在另一个 Author 记录中读取它将反转appropriatley的关系 . 所以这:

"CCC" -> {"books": [555]}

会知道 "BBB" 也是 Author 并会发出更新的事件:

555 -> {"authors": ["BBB", "CCC"]}

我一直在关注 GlobalKTable ,它在本地读取完整的主题状态,但无法弄清楚如何让它反转关系并将值聚合在一起 .

如果可以的话,我想我可以用事件流加入 GlobalKTable ,并为每个 Book 获取 Author 的完整列表 .

1 回答

  • 3

    您不必使用 GlobakKTable 来达到您的要求 . 在Kafka Streams中,由更改密钥引起的内部数据重新分配会自动发生 . 例如 :

    orgKStream
      .flatMapValues(books -> getBookList) (1)
      .map((k,v) -> new KeyValue<>(v, k))  (2)
      .groupByKey()                        (3)
      .aggregate(//aggregate author list ) (4)
      .toStream(// sink topic)             (5)
    

    (1)将改变您的原始主题,如下所示 .

    <before>
    "AAA" -> {"books": [456, 333]}
    "BBB" -> {"books": [333, 555]}
    <after>
    "AAA" -> 456
    "AAA" -> 333
    "BBB" -> 333
    "BBB" -> 555
    

    (2)将值替换为键 .

    <after>
    456 -> "AAA"
    333 -> "AAA"
    333 -> "BBB" 
    555 -> "BBB"
    

    (3)和(4)将聚合并生成KTable(和状态存储)

    <after>
    456 -> {"authors": ["AAA"]}
    333 -> {"authors": ["AAA", "BBB"]}
    555 -> {"authors": ["BBB"]}
    

    (5)将表中的整个记录写入给定的主题 .

    现在,您有一个新主题,其中包含book作为键,作者列表作为值 . 如果您想将整个结果放在一个地方,现在只需创建如下所示的GlobalKTable .

    StreamsBuilder.globalTable(<sink topic>)
    

    如果调用(2)(map)然后调用(3)(groupByKey),将发生通过重新分区主题的内部数据重新分配 . 这意味着具有相同书籍ID的所有记录将被发布到内部重新分区主题的同一分区中 . 因此,您不会丢失任何聚合数据 .

相关问题