我是第一次尝试Akka,更具体地说是Akka Persistence . 我最终试图实现一个小玩具程序来复制Akka在事件源应用程序中的使用 . 我已经取得了成功,直到我尝试使用 ReadJournal
将我的事件流投射到我的域中 .
def main(args: Array[String]): Unit = {
val commands: EmployeeCommandStream = TestEmployeeCommandStream(EmployeeId.generate())
implicit val executionContext = ExecutionContext.global
implicit val system = ActorSystem.create("employee-service-actor-system")
implicit val mat: Materializer = ActorMaterializer()(system)
val service = system.actorOf(Props(classOf[EmployeeActor], commands.employeeId))
commands.stream.foreach(command => service.tell(command, noSender))
lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal")
.asInstanceOf[ReadJournal
with CurrentPersistenceIdsQuery
with CurrentEventsByPersistenceIdQuery
with CurrentEventsByTagQuery
with EventsByPersistenceIdQuery
with EventsByTagQuery]
println(Await.result(
readJournal
.eventsByPersistenceId(commands.employeeId.toString, 0L, Long.MaxValue)
.map(_.event)
.runFold(Employee.apply())({
case (employee: Employee, event: EmployeeEvent) => employee.apply(event)
}),
Duration("10s")
))
}
我的域名唯一聚合是 Employee
,所以我'm just starting up an actor with the UUID representing some employee, and then I' m为该员工发布了一些命令 .
在上面的示例中,如果我删除 println(Await.result(...))
并将 .runFold(...)
替换为 .runForeach(println)
,则我的actor中保留的事件将按预期打印给每个给定的命令 . 所以我知道我的程序的写入方和 ReadJournal
都按预期工作 .
原样,我的程序终结了
Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
所以现在我的问题是,为什么我不能执行 runFold
来最终重播我的事件流?有一个更好的方法吗?我只是滥用API吗?
任何帮助将不胜感激,谢谢!
1 回答
使用
runFold
,您正在折叠流 . 当流本身终止时,折叠将有效终止 .通过使用
eventsByPersistenceId
,您要求永不停止的直播活动流,因此您的弃牌不会终止 .您应该使用
currentEventsByPersistenceId
代替您的用例 . 此变体将流式传输日记中当前可用的事件并终止 .见https://doc.akka.io/docs/akka/2.5.6/scala/persistence-query.html#eventsbypersistenceidquery-and-currenteventsbypersistenceidquery