首页 文章

Kafka Streams - 按时间戳/序列保留消息?

提问于
浏览
1

我在Kafka流上收到消息 . 它们由用户ID键入 . 当他们进来时,他们会得到一个序列号和时间戳 . 这些消息在15分钟后“过期” . 用户可以根据给定时间(最多15分钟)或序列请求新消息 .

我最初的东西是这样的:

`StreamsBuilder streamsBuilder = new StreamsBuilder();

KStream<String, Message> inboundStream = streamsBuilder.stream("incoming.topic");
  messageSupplier = Stores.persistentKeyValueStore("user.messages");

  KTable<String, MessageCache> messageTable = inboundStream
      .filter(this::userExists)
      .peek(this::recordInboundMessage)
      .map(this::markMessage)       // add sequence/timestamp
      .groupByKey()
      .aggregate(this::createMessageCache,
              this::addMessageToMessageCache,
              Materialized.as(messageSupplier));

  // ---> Some other setup stuff, then start the streams

`

MessageCache 保存消息列表(当我们将消息添加到缓存时删除过期的消息) . 当我收到消息请求时,我会浏览列表并过滤掉相应的消息 .

我以为我可以使用其中一种窗口策略,但找不到实际持久化消息列表的示例 .

这是最好的方法吗?或者我错过了一些会让这更容易/更好的东西?

1 回答

  • 0

    这是最好的方法吗?或者我错过了一些会让这更容易/更好的东西?

    我认为你有一个简单的解决方案,使用本机java类,有效地将流应用程序与你的代码连接起来......为了简单起见,有很多话要说!我能看到的唯一缺点是,如果您的事件速率过高,您的用户缓存可能会超出您的内存大小 . 此外,如果您需要容错,则流式应用程序将在另一个应用程序实例上重建状态存储的内容,以防出现故障 . 但如果这不是一个问题,那就去吧!

    但是,就如何在流应用程序上下文中执行此操作而言,您可以进行一些调整:

    • 定义要支持的用户查询的粒度 . 分钟?秒?我们为了争论而说几分钟 . 根据粒度窗口显示您的流 .

    • 定义一个累加器,类似于你所拥有的累加器,它将接受用户记录并将其添加到列表中 . 类似 UserRecordGroupUserRecordUserRecord 和方法 add(UserEvent evt) 会将 UserRecord 附加到 List .

    然后,您可以构建您的流应用程序,如:

    KStream<String, Message> inboundStream = streamsBuilder.stream("incoming.topic");
     Materialized<String, UserRecordGroup, WindowStore<Bytes, byte[]>> userStore =
     Materialized.<String, UserRecordGroup, WindowStore<Bytes,byte[]>>as("user.messages")
      .withValueSerde(/* your serializers here */);
    
    
    KTable<String, MessageCache> messageTable = inboundStream
      .filter(this::userExists)
      .peek(this::recordInboundMessage)
      .map(this::markMessage)       // add sequence/timestamp
      .groupByKey()
      .windowedBy(TimeWindows.of(ONE_MINUTE_IN_MS))
      .aggregate(UserRecordGroup::new,
                (key, value, agg) -> { agg.add(value); },
                 userStore);
    

    最后,当您想要从商店提供查询时,您可以

    ReadOnlyWindowStore<Integer, UserRecordGroup> store =
       streams.store("user.messages", QueryableStoreTypes.windowStore());
    WindowStoreIterator<UserRecordGroup> windowIterator = 
         store.fetch(pathHash, startTimestamp, endTimeStamp);
    

    您的迭代器将包含不同窗口的所有记录的列表;将这些列表合并在一起,您就可以获得startTimestamp和endTimestamp之间的用户活动的描述 .

相关问题