我正在尝试实施CQRS解决方案 . 在我执行一些操作(执行一个命令)后,我将生成的事件(例如案例类(例如:AccountCreated))传递给另一个actor,然后使用akka流将事件通过线路推送到某个外部Sink .

假设事件类型是 Events ,我的发布actor的receive函数看起来像这样

override def receive = {
   case event: Events => {
  //define my handlers/Sink etc.
  //I should be able to pass my Events into Source()
 val result = Source(event).via(Tcp().outgoingconnection(add, p)) //obv this will fail. 
 }
}

它需要一个Iterable [T],例如 (1 to 100).map(ByteString(_)) 会起作用 . 即使通过 Seq(1, 2, 3) 也行不通 .

/**
   * Helper to create [[Source]] from `Iterable`.
   * Example usage: `Source(Seq(1,2,3))`

正如apply方法上面的源代码注释中所给出的那样 .

tl;dr 我在演员中得到一个事件,一旦收到它,我需要将其添加到Akka流的源中 .