首页 文章

CQRS存储库/事件发布者

提问于
浏览
7

我正在使用CqrsLite进行CQRS式项目 . 具体Repository实现的Save方法如此(省略了不相关的行) .

public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
    {
        if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
            throw new ConcurrencyException(aggregate.Id);

        var i = 0;
        foreach (var @event in aggregate.GetUncommittedChanges())
        {
            // ... [irrelevant code removed] ...
            _eventStore.Save(typeof(T), @event);
            _publisher.Publish(@event);
        }
        aggregate.MarkChangesAsCommitted();
    }

令我不安的是,这个方法是在聚合被告知将其标记为已提交之前提交要发布给订阅者的事件 . 因此,如果观察到给定事件的事件处理程序阻塞,则聚合将不会提交先前事件处理程序可能已被通知的已提交更改 .

为什么我不会将_publisher.Publish(@event)移动到aggregate.MarkChangesAsCommitted()之后,就像这样 . 我错过了什么?

public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
    {
        if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
            throw new ConcurrencyException(aggregate.Id);

        var events = aggregate.GetUncommittedChanges();
        foreach (var @event in events)
        {
            // ... [irrelevant code removed] ...
            _eventStore.Save(typeof(T), @event);
        }
        aggregate.MarkChangesAsCommitted();
        _publisher.Publish(events);
    }

3 回答

  • 1

    这两种方法都存在问题,因为无论调用这两种方法的顺序如何, SavePublish 之间可能存在错误 . 这可能导致未发布的事件被发布或保存的事件未被发布 . 内存状态损坏(在聚合对象中)的问题也存在(尽管可以通过简单地捕获事件处理程序产生的错误来处理) .

    该问题的一个解决方案是使用两阶段提交(例如,如果您的事件存储是基于SQL Server且发布者是基于MSMQ的,则可用) . 但是,这具有性能,可伸缩性和操作含义,并且不允许后期订阅者(见下文) .

    更好的方法是允许对事件感兴趣的各方将它们从事件存储中拉出(理想情况下,将其与某种通知机制相结合或长时间轮询以使其更多"reactive") . 这将跟踪最后收到的事件的责任转移给订户,允许

    • 迟到的订阅者(在存储事件后很久加入)接收旧事件以及新事件,
      没有两阶段提交的
    • 可靠性 .

    在搜索“使用事件存储作为队列”之类的内容时,您应该找到更多有关此方法的信息,而Greg的答案中的视频可能也会为此添加很多内容 .

    一个常见的算法就是这个:

    • 事件存储为每个保存的事件分配一个检查点令牌(例如,序列号);

    • 订阅者向事件存储区询问新事件(定期,基于长轮询,对推送通知作出反应等),从他们知道的最后一个检查点令牌(如果有的话)开始,

    • 事件存储从该检查点令牌开始发送较新的事件以及新的检查点令牌,

    • 订阅者处理事件,并且如果可能的话,以原子方式存储新的检查点令牌以及它们产生的任何副作用;

    • 如果无法进行原子保存,他们可以在产生副作用后存储新的检查点令牌,并且他们需要一种方法来忽略他们已经看到的事件,以防中间存在错误(事件处理被认为是"idempotent");

    • 订阅者再次从#2开始 .

    我认为事件存储忽略了 生产环境 就绪的 Save / Publish 问题 . 有关替代方案,请参阅Greg Young's Event Store或(当前或多或少未维护)NEventStore .

  • -4

    No matter which order of Commit and Publish you choose, you will have issues if the second one fails.

    有多种方法可以解决此问题 . 以下是您的主要选择:

    • 使用与应用程序使用相同数据库的消息传递基础结构,以便可以使用单个事务 . 当一个非常简单的消息传递基础结构足够时,该解决方案是可行的,并且团队决定自己构建它 .

    • 使用2阶段提交 . 这会对性能产生影响,这可能与您的应用程序相关,也可能不相关 .

    • 手动确保您不会陷入不一致的状态 . 有关详细信息,请参阅this answer of mine,我认为复制整个答案并不合理 .

  • 7

    这是一种反模式,你不应该这样做 .

    显然放置一个链接阻止我登录为“可疑请求”

    https://www.youtube.com/watch?v=GbM1ghLeweU

相关问题