首页 文章

Akka Persistence:ReadJournal.runFold永远不会返回

提问于
浏览
0

我是第一次尝试Akka,更具体地说是Akka Persistence . 我最终试图实现一个小玩具程序来复制Akka在事件源应用程序中的使用 . 我已经取得了成功,直到我尝试使用 ReadJournal 将我的事件流投射到我的域中 .

def main(args: Array[String]): Unit = {
    val commands: EmployeeCommandStream = TestEmployeeCommandStream(EmployeeId.generate())

    implicit val executionContext = ExecutionContext.global
    implicit val system = ActorSystem.create("employee-service-actor-system")
    implicit val mat: Materializer = ActorMaterializer()(system)

    val service = system.actorOf(Props(classOf[EmployeeActor], commands.employeeId))

    commands.stream.foreach(command => service.tell(command, noSender))

    lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal")
      .asInstanceOf[ReadJournal
      with CurrentPersistenceIdsQuery
      with CurrentEventsByPersistenceIdQuery
      with CurrentEventsByTagQuery
      with EventsByPersistenceIdQuery
      with EventsByTagQuery]

    println(Await.result(
      readJournal
        .eventsByPersistenceId(commands.employeeId.toString, 0L, Long.MaxValue)
        .map(_.event)
        .runFold(Employee.apply())({
          case (employee: Employee, event: EmployeeEvent) => employee.apply(event)
        }),
      Duration("10s")
    ))     
}

我的域名唯一聚合是 Employee ,所以我'm just starting up an actor with the UUID representing some employee, and then I' m为该员工发布了一些命令 .

在上面的示例中,如果我删除 println(Await.result(...)) 并将 .runFold(...) 替换为 .runForeach(println) ,则我的actor中保留的事件将按预期打印给每个给定的命令 . 所以我知道我的程序的写入方和 ReadJournal 都按预期工作 .

原样,我的程序终结了

Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]

所以现在我的问题是,为什么我不能执行 runFold 来最终重播我的事件流?有一个更好的方法吗?我只是滥用API吗?

任何帮助将不胜感激,谢谢!

1 回答

相关问题