我在Kafka流上收到消息 . 它们由用户ID键入 . 当他们进来时,他们会得到一个序列号和时间戳 . 这些消息在15分钟后“过期” . 用户可以根据给定时间(最多15分钟)或序列请求新消息 .
我最初的东西是这样的:
`StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, Message> inboundStream = streamsBuilder.stream("incoming.topic");
messageSupplier = Stores.persistentKeyValueStore("user.messages");
KTable<String, MessageCache> messageTable = inboundStream
.filter(this::userExists)
.peek(this::recordInboundMessage)
.map(this::markMessage) // add sequence/timestamp
.groupByKey()
.aggregate(this::createMessageCache,
this::addMessageToMessageCache,
Materialized.as(messageSupplier));
// ---> Some other setup stuff, then start the streams
`
MessageCache
保存消息列表(当我们将消息添加到缓存时删除过期的消息) . 当我收到消息请求时,我会浏览列表并过滤掉相应的消息 .
我以为我可以使用其中一种窗口策略,但找不到实际持久化消息列表的示例 .
这是最好的方法吗?或者我错过了一些会让这更容易/更好的东西?
1 回答
我认为你有一个简单的解决方案,使用本机java类,有效地将流应用程序与你的代码连接起来......为了简单起见,有很多话要说!我能看到的唯一缺点是,如果您的事件速率过高,您的用户缓存可能会超出您的内存大小 . 此外,如果您需要容错,则流式应用程序将在另一个应用程序实例上重建状态存储的内容,以防出现故障 . 但如果这不是一个问题,那就去吧!
但是,就如何在流应用程序上下文中执行此操作而言,您可以进行一些调整:
定义要支持的用户查询的粒度 . 分钟?秒?我们为了争论而说几分钟 . 根据粒度窗口显示您的流 .
定义一个累加器,类似于你所拥有的累加器,它将接受用户记录并将其添加到列表中 . 类似
UserRecordGroup
的UserRecord
的UserRecord
和方法add(UserEvent evt)
会将UserRecord
附加到List
.然后,您可以构建您的流应用程序,如:
最后,当您想要从商店提供查询时,您可以
您的迭代器将包含不同窗口的所有记录的列表;将这些列表合并在一起,您就可以获得startTimestamp和endTimestamp之间的用户活动的描述 .