Lagom reference documentation显示了如何标记事件:
object BlogEvent {
val BlogEventTag = AggregateEventTag[BlogEvent]
}
sealed trait BlogEvent extends AggregateEvent[BlogEvent] {
override def aggregateTag: AggregateEventTag[BlogEvent] =
BlogEvent.BlogEventTag
}
密封的特征表明可以标记父事件以按顺序处理所有子项:
具有特定标记的所有事件都可以作为顺序的有序事件流使用 .
所以我们采用这种方式,我们标记了我们的父事件,并且我们使用光滑实现了一个ReadSideProcessor并且它没有工作 . 提高日志记录级别我们看到"unhandled message",我们在 SlickReadSideImpl
中发现了以下内容:
override def handle(): Flow[EventStreamElement[Event], Done, NotUsed] =
Flow[EventStreamElement[Event]]
.mapAsync(parallelism = 1) { element =>
val dbAction = eventHandlers.get(element.event.getClass)
.map { handler =>
// apply handler if found
handler(element)
}
.getOrElse {
// fallback to empty action if no handler is found
if (log.isDebugEnabled) log.debug("Unhandled event [{}]", element.event.getClass.getName)
DBIO.successful(())
}
.flatMap { _ =>
// whatever it happens we save the offset
offsetDao.updateOffsetQuery(element.offset)
}
.map(_ => Done)
slick.db.run(dbAction.transactionally)
}
如果类与注册处理程序的类完全匹配,则上面的 eventHandlers.get(element.event.getClass)
无法找到任何处理程序,例如它是子类(我们的情况) .
这有点令人困惑:这是期望的行为还是 JDBCReadSideImpl
和 SlickReadSideImpl
的实施中的错误?
-
如果是所需行为,则不应标记密封的特征事件(可能需要在文档中更新)
-
如果是所需的行为,则JDBCReadSideImpl和SlickReadSideImpl不能使用从classname到handler的映射 .
1 回答
这是按预期工作的 . 期望您将要以不同的方式处理具体的事件类型,因此您将为每个具体类型注册一个处理程序 .
标记具有相同标记的所有事件子类型的要点是确保在同一实体上的不同类型的事件之间进行排序 . 例如,如果您有
BlogCreatedEvent
和BlogPublishedEvent
,那么您可能希望确保您的处理器在发布的事件之前接收创建的事件,即使它以不同方式处理它们 .