首页 文章

加入Kafka流中的外键

提问于
浏览
0

假设我有三个Kafka主题,其中包含代表不同聚合(事件采购应用程序)中发生的业务事件的事件 . 这些事件允许使用以下属性构建聚合:

  • users:usedId,name
    应用程序的

  • 个模块:moduleId,name

  • 用户模块的授权:grantId,userId,moduleId,scope

现在我想创建一个包含用户和产品名称(而不是id)的所有授权流 . 我想这样做:

  • 通过userId对事件进行分组,为用户创建KTable . KTable将userId作为键 . 没关系 .

  • 通过按productId对事件进行分组,为产品创建KTable . KTable将productId作为关键 . 没关系 .

  • 从Grants流创建一个流并加入两个KTable . 没关系 . 问题是连接似乎只能在主键上使用 . 但是流的关键是Grant的技术标识符,用户和产品表的密钥不是(它们与Grant无关) .

那怎么办?

1 回答

  • 1

    嗯,目前在Kafka Streams没有直接支持外键加入 .
    有一个开放的KIP:https://issues.apache.org/jira/browse/KAFKA-3705为相同 .

    目前,可以有解决此问题的解决方法 . 你可以使用 KStream-KTable Join .

    首先将 User Stream和 Module Stream聚合到具有聚合事件集合的相应KTable中 .

    KTable<String,Object> UserTable = userStream.groupBy(<UserId>).aggregate(<... build collection/latest event>) ;
    KTable<String,Object> ModuleTable = moduleStream.groupBy(<ModuleId>).aggregate(<... build collection/latest event>);
    

    现在选择moduleID作为 Grants 流中的键 .

    KStream<String,Object> grantRekeyedStream = grantStream.selectKey(<moduleId>);
    

    它会将密钥更改为 moduleId . 现在,您可以使用 ModuleTable 执行Stream-Table Join . 它会将右侧的所有匹配记录连接到左侧的键 . 结果流将 GrantModule 数据放入一个流中,并以 ModuleId 为键 .

    KStream<String,Object> grantModuleStream = grantRekeyedStream.join(moduleTable);
    

    下一步是加入 userTable . 因此,您需要使用 userId 重新键入 grantModuleTable .

    KStream<String,Object> grantModuleRekeyedStream = grantModuleTable.selectKey(<Select UserId>);
    

    现在 grantModuleRekeyedStream 可以与 userTable 连接 KStream-KTable Join

    KStream<String,Object> grantModuleUserStream = grantModuleRekeyedStream .join(userTable);
    

    Stream上方将使用用户ID作为密钥,并包含该用户的所有授权和模块详细信息 .

相关问题