首页 文章

Kafka 流加入

提问于
浏览
7

我有2个kafka主题 - recommendationsclicks . 第一个主题具有由唯一ID(称为 recommendationsId )键入的建议对象 . 每个产品都有一个用户可以单击的URL .

clicks 主题获取通过向用户推荐的那些产品URL的点击生成的消息 . 它已设置为这些点击消息也由 recommendationId 键控 .

注意

  • 建议与点击之间的关系是一对多的 . 建议可能会导致多次点击,但点击始终与单个推荐相关联 .

  • 每个点击对象都有一个相应的推荐对象 .

  • 点击对象的时间戳晚于推荐对象 .

  • 推荐与相应点击之间的差距可能是几秒到几天(比如最多7天) .

我的目标是使用Kafka stream join加入这两个主题 . 我不清楚的是我是否应该使用KStream x KStream连接或KStream x KTable连接 .

我通过 recommendations table加入 clicks stream实现了 KStream x KTable join . 但是,如果建议是在加入者启动之前生成的,并且在加入者启动后点击到达,则无法看到任何加入的点击建议对 .

我使用正确的加入吗?我应该使用 KStream x KStream 加入吗?如果是这样,为了能够在过去7天内加入带有推荐的点击,我应该将窗口大小设置为7天吗?在这种情况下,我还需要设置"retention"期间吗?

我执行 KStream x KTable join的代码如下 . 请注意,我已经定义了类 RecommendationsClick 及其相应的serde . 点击消息只是简单的 String (网址) . 此URL字符串与 Recommendations 对象连接以创建 Click 对象,该对象将发送到 jointTopic .

public static void main(String[] args){
    if(args.length!=4){
      throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic");
    }

    final String booststrapList = args[0];
    final String clicksTopic = args[1];
    final String recsTopic = args[2];
    final String jointTopic = args[3];

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
    config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName());

    KStreamBuilder builder = new KStreamBuilder();

    // load clicks as KStream
    KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic);

    // load recommendations as KTable
    KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic);

    // join the two
    KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs));

    // emit the join to the jointTopic
    join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);

    // let the action begin
    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
  }

只要在运行木工(上述程序)后生成了建议和点击,这样就可以正常工作 . 但是,如果在网站运行之前生成推荐的点击到达,我看不到任何连接发生 . 我该如何解决?

如果解决方案是使用 KStream x KSTream join,那么请帮助我了解我应该选择哪个窗口大小以及选择的保留期限 .

1 回答

  • 7

    你的整体观察是正确的 . 从概念上讲,您可以通过两种方式获得正确的结果 . 如果使用流表连接,则有两个缺点(可能会在将来的Kafka版本中重新访问和改进)

    • 您已经提到过,如果在相应推荐之前处理了点击,则(内部)联接将失败 . 但是,如您所知,将有推荐,您可以使用左连接而不是内连接,检查连接结果,如果建议是 null ,则将click事件写回输入主题(即,您得到一个重试逻辑) - 当然,单个推荐的连续点击可能会出现故障,您可能需要在应用程序代码中考虑到这一点 .

    • KTable 的第二个缺点是,随着时间的推移,它会永远增长并且无限制,因为你会为它添加越来越多的独特建议 . 因此,您需要通过将 <recommendationsId, null> 形式的tombstones记录发送到推荐主题来实现一些"expiration logic",以删除您不再关心的旧推荐 .

    • 这种方法的优点是,与流 - 流连接相比,总共需要更少的内存/磁盘空间,因为您只需缓冲应用程序中的所有建议(但没有点击) .

    如果您使用流 - 流联接,并且在推荐后7天可能发生单击,则您的窗口大小必须为7天 - 否则,点击将不会与推荐一起加入 .

    • 这种方法的缺点是,您将需要更多的内存/磁盘,因为您将缓冲应用程序中过去7天的所有点击和所有建议 .

    • 优点是订单或处理(即推荐与点击)不再重要(即,您不需要像上面描述的那样实施重试策略)

    • 此外,旧的建议会自动过时,因此您不需要实现特殊的"expiration logic" .

    对于流 - 流加入,保留时间的答案略有不同 . 它必须至少7天,因为窗口大小是7天 . 否则,你会的删除“运行窗口”的记录 . 您还可以将保留期设置得更长,以便能够处理“延迟数据” . 假设用户在窗口时间范围结束时(推荐的7天时间 Span 前5分钟)点击,但点击仅在1小时后报告给您的应用程序 . 如果您的保留期限为7天作为您的窗口大小,则此迟到的记录将无法再处理(因为建议已被删除) . 如果您设置较长的保留期限,例如8天,您仍然可以处理延迟记录 . 这取决于您的应用程序/语义需要您想要使用的保留时间 .

    简介:从实现的角度来看,使用流 - 流连接比使用流表连接更简单 . 但是,预计可以节省内存/磁盘,并且可能会很大,具体取决于您的点击流数据速率 .

相关问题