我正在尝试在Kafka streams之上实现一个简单的CQRS /事件源概念验证(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/中所述)
我有4个基本部分:
-
commands
topic,它使用聚合ID作为按顺序处理每个聚合命令的键 -
events
主题,发布聚合状态的每个更改(同样,密钥是聚合ID) . 本主题的保留策略为"never delete" -
A KTable减少聚合状态并将其保存到状态存储
events topic stream ->
group to a Ktable by aggregate ID ->
reduce aggregate events to current state ->
materialize as a state store
- 命令处理器 - 命令流,左连接聚合状态KTable . 对于结果流中的每个条目,使用函数
(command, state) => events
生成结果事件并将其发布到events
主题
问题是 - 有没有办法确保我在州商店中拥有最新版本的聚合?
如果违反业务规则,我想拒绝命令(例如,如果实体被标记为已删除,则修改实体的命令无效) . 但是如果发布 DeleteCommand
后跟 ModifyCommand
,则删除命令将生成 DeletedEvent
,但是当处理 ModifyCommand
时,状态存储中的加载状态可能尚未反映,并且将发布冲突事件 .
我不介意牺牲命令处理吞吐量,我宁愿得到一致性保证(因为所有内容都按相同的密钥分组,最终应该在同一个分区中)
希望很清楚:)有什么建议吗?
3 回答
我不认为Kafka对CQRS和事件采购有好处,就像你描述的那样,因为它缺乏(简单)方法来确保防止并发写入 . 这个article详细讨论了这一点 .
我所说的方式是指您希望命令生成零个或多个事件或因异常而失败的事实;这是带有事件采购的经典CQRS . 大多数人都期望这种架构 .
您可以采用不同风格的事件采购 . 您的命令处理程序可以为收到的每个命令(即
DeleteWasAccepted
)生成事件 . 然后,事件处理程序最终可以以事件源方式处理该事件(通过从其事件流重建Aggregate的状态)并发出其他事件(即ItemDeleted
或ItemDeletionWasRejected
) . 因此,命令被发送 - 忘记,发送异步,客户端不等待立即响应 . 然而,它等待一个描述其命令执行结果的事件 .一个重要的方面是事件处理程序必须以串行方式处理来自同一聚合的事件(恰好一次且按顺序) . 这可以使用单个Kafka Consumer Group来实现 . 你可以在这个video中看到这个架构 .
我想出的一个可能的解决方案是实现一种乐观的锁定机制:
在命令上添加
expectedVersion
字段使用KTable
Aggregator
增加每个已处理事件的聚合快照的版本如果
expectedVersion
不是't match the snapshot'的聚合版本,则拒绝命令这似乎提供了我正在寻找的语义
请我的同事Jesper阅读这篇文章 . Kafka是一款出色的产品,但实际上并不适合采购活动
https://medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c