首页 文章

Kafka Streams - 减少大型国营商店的内存占用

提问于
浏览
1

我有一个拓扑(见下文),它读取了一个非常大的主题(每天超过十亿条消息) . 这个Kafka Streams应用程序的内存使用率非常高,我正在寻找一些关于如何减少州商店足迹的建议(详情如下) . Note: 我不是想把山羊放到国营商店,我只是觉得可能有办法改善我的拓扑结构 - 见下文 .

// stream receives 1 billion+ messages per day
stream
    .flatMap((key, msg) -> rekeyMessages(msg))
    .groupBy((key, value) -> key)
    .reduce(new MyReducer(), MY_REDUCED_STORE)
    .toStream()
    .to(OUTPUT_TOPIC);

// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);


// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)

// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)

// etc

更具体地说,我想知道将 OUTPUT_TOPIC 作为KTable流式传输是否会导致状态存储( REKEYED_STORE )大于本地需要的状态存储 . 对于具有大量唯一键的changelog主题,最好将它们作为 KStream 流式传输并进行窗口化聚合吗?或者这不会像我想的那样减少占用空间(例如,只有一部分记录 - 窗口中的那些记录将存在于本地状态存储中) .

无论如何,我总是可以启动这个应用程序的更多实例,但我想让每个实例尽可能高效 . 这是我的问题:

  • 是否有任何配置选项,一般策略等应该考虑具有此级别吞吐量的Kafka Streams应用程序?

  • 是否存在单个实例应具有的内存密集程度的指导原则?即使您有一些武断的指南,与他人分享可能会有所帮助 . 我的一个实例目前正在使用15GB的内存 - 我不知道是否重要_2410704 .

任何帮助将不胜感激!

1 回答

  • 5

    用你当前的模式

    stream.....reduce(.)toStream().to(OUTPUT_TOPIC);
    builder.table(OUTPUT_TOPIC, REKEYED_STORE)
    

    你得到两个内容相同的商店 . 一个用于 reduce() 运算符,另一个用于读取 table() - 这可以减少到一个商店:

    KTable rekeyedTable  = stream.....reduce(.);
    rekeyedTable.toStream().to(OUTPUT_TOPIC); // in case you need this output topic; otherwise you can also omit it completely
    

    这应该会显着降低内存使用量 .

    关于窗口与非窗口:

    • 这是你所需语义的问题;如此简单的从非窗口切换到窗口缩小似乎是值得怀疑的 .

    • 即使你也可以使用窗口语义,你也不一定会减少内存 . 注意,在聚合情况下,Streams不存储原始记录,而只存储当前聚合结果(即key currentAgg) . 因此,对于单个密钥,两种情况的存储要求是相同的(单个窗口具有相同的存储要求) . 同时,如果你使用Windows,你可能实际上需要更多的内存,因为你得到一个聚合的专业密钥专业窗口(而在非窗口的情况下你只得到一个聚合的专业密钥) . 您可以节省内存的唯一方案是,您的“密钥空间”会在很长一段时间内展开 . 例如,您可能无法长时间获取某些键的任何输入记录 . 在非窗口的情况下,这些记录的聚合将一直存储,而对于窗口化的情况,如果具有此键的记录稍后发生,则将删除密钥/聚合记录并重新创建新的记录 . 再次(但请记住,在这种情况下你丢失了以前的aggergate - cf.(1))

    最后但同样重要的是,您可能需要查看调整应用程序大小的指南:http://docs.confluent.io/current/streams/sizing.html

相关问题