我在AWS Lambda上运行了一个基于微服务的应用程序 . 其中两个微服务,最关键的微服务,使用事件源/ cqrs .
Background: (this is also for me to organize my thoughts)
我正在使用this library并在DynamoDB中存储事件并在AWS S3中进行投影 .
写入部分就像一个魅力:每个命令调用从DynamoDB加载聚合的当前状态(通过处理程序运行事件和/或加载缓存聚合),它决定接受或拒绝基于某些业务逻辑的命令,然后使用 KeyConditionExpression: 'aggregateId = :a AND version >= :v'
写入DynamoDB,其中版本是为该聚合处理的事件计数 . 如果存在冲突,则写入失败 . 对我来说似乎是一个很好的系统!
然后将每个事件广播到SNS(主题名称是服务名称),以便其他服务可以根据需要对事件做出反应 .
我真正挣扎的部分是阅读 . 预测存储在S3中,并使用为每个事件源处理的最后一个commitId进行标记 . 当读取查询进入时,它从S3加载整个投影状态(对于所有聚合),查询所有较新事件的事件源,计算最新状态(同样,对所有聚合 - 并将更新对象写入S3,如果它是更新),并根据查询参数返回状态的相关部分 .
My problem: (or one of them)
我认为我做错了 .
我的大多数预测只按重要属性分组ID,因此文件保持相对较小 . 但我还需要一种方法来检索单个聚合 . 使用投影看起来很疯狂,因为我需要每次加载整个状态(即每个预计的聚合)将新事件应用于该状态,然后检索我想要的记录(它甚至可能没有改变) .
这就是我现在正在做的,它表现良好(<100k记录),但我无法想象它会持续更长时间 .
另一个问题是查询 . 我需要为我需要查询的每个属性构建一个投影映射值以匹配aggregateIds!必须有更好的方法!
无论我如何看待这个问题,投影总是需要整个当前状态的任何新事件才能返回一个没有改变的单个记录 .
2 回答
我也这么认为;听起来你的查询与你的预测有关
是的,这听起来像一团糟 . 或者更具体地说,听起来像查询正在触发投影要完成的工作 .
如果您可以将查询与投影分离,那么事情会变得更容易 . 基本思想是您的查询不描述当前状态,它们描述了上次投影运行时的状态 .
同样的想法,不同的拼写:您从S3中缓存的文档中回答查询 . 检测到新事件时,运行预测,根据需要加载新数据,计算新文档,并替换缓存中的条目 .
我想到了一个三角形
命令将来自外部的信息带到记录簿
预测将记录簿中的信息带到缓存中
查询将缓存中的信息传递给外部世界
三角形的每条腿与其他部分异步运行 .
我建议你从查询中反向工作 - 你需要哪些文件来支持每个查询?您必须击败的延迟目标是什么?然后你开始 balancer 权衡 - 对于这个新的查询,我是从现有文档创建结果,还是我需要一个用更细粒度构建的新文档?
是的,......事件只是触发的一种方式;您还可以通过时钟触发投影过程(每15分钟检查一次,看看我们是否需要更新)或者是人工操作员的心血来潮(嗯,看起来您的账户余额过时了,让我试着更新一下为了你) . 不止一种方法,你可以混合和匹配策略 .
不必要 . 没有规则说您不能使用先前缓存的表示作为起点,然后从记录簿中仅提取您需要的更改 .
例如,假设您正在构建一个组合聚合
A{id:7}
和B{id:9}
的视图 . 你 grab 缓存的副本,并查看它的元数据(你把它放在你以前的写入的地方),并找到它内部的东西,如metadata:{A:{id:7, version:21}, B:{id:9, version:19}}
. 现在,您只需要在上次使用的事件之后加载事件,在内存中更新本地副本,更新元数据的本地副本,然后推送批次到缓存 .我不熟悉您的技术基础设施,但我实施预测的方式如下:
每个域事件都有一个跨越所有聚合根的全局序列号 . 投影是具有任意名称的读取模型,并且该最终处理的位置由该全局序列号表示 . 我可以随时添加一个新的投影,以及它的事件处理程序,它将从位置0开始 . 我可以随时清除投影并将位置设置为0.我也可以使用添加新的组合将替换现有的投影,即使需要数天也会进行构建,然后删除旧版本 .
有一项服务可以监视投影并使用事件存储,就像一个队列 . 投影服务在当前位置之后检查具有全局ID的事件,并将这些事件交给处理程序,然后更新位置 . 这是您的投影甚至可以过滤事件类型以提高性能的地方 .
这是基本的想法 . 您的预测就是您所查询的内容 . 一旦投影赶上事件存储的“头部”,事件存储中的事件将被投射到投影中 .
这将如何转化为您的技术空间我不太确定 . 我有一个名为Shuttle.Recall的实验正在进行C#,如果您想了解一些想法 .