首页 文章

在处理Akka中的下一条消息之前等待异步Future调用

提问于
浏览
14

接收事件时,Akka Actors将一次处理一条消息,阻塞直到请求完成,然后再转到下一条消息 .

这适用于同步/阻塞任务,但是如果我想执行异步/非阻塞请求,Akka将继续处理而无需等待任务完成 .

例如:

def doThing():Future[Unit] = /* Non blocking request here */

 def receive = {
     case DoThing => doThing() pipeTo sender
 }

这将调用doThing()并开始处理未来,但在处理下一条消息之前不会等待它完成 - 它将尽可能快地执行队列中的下一条消息 .

实质上,似乎Akka认为“返回未来”是“完成处理”并转移到下一条消息 .

为了一次处理一条消息,似乎我需要主动阻止Actor线程来阻止它这样做

def receive = {
    case DoThing => sender ! blocking(Await.result(doThing()))
}

这感觉就像一个非常错误的方法 - 它是人为地阻塞代码中的一个线程,否则它应该是完全无阻塞的 .

当将Akka与Elixir演员比较时,我们可以通过使用尾调用来请求下一条消息而不需要人为阻塞来轻松避免这个问题 .

在Akka还有什么办法吗?

a)等待 Future 完成,然后处理下一条消息而不阻塞线程 .

b)使用显式尾调用或其他一些机制来使用基于拉的工作流而不是基于推送?

4 回答

  • 0

    使用Akka Streams,您可以使用mapAsync

    import akka.actor.ActorSystem
    import akka.stream._
    import akka.stream.scaladsl._
    import scala.concurrent.Future
    
    implicit val system = ActorSystem("ThingDoer")
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher
    
    case object DoThing
    
    def doThing(): Future[Unit] = Future {
      println("doing its thing")
    }
    
    Source((1 to 10).map(_ => DoThing))
      .mapAsync(parallelism = 1)(_ => doThing())
      .runWith(Sink.ignore)
    
  • 0

    与评论中建议的一样,您可以在等待 Future 解析时使用 Stashhttp://doc.akka.io/docs/akka/current/scala/actors.html#Stash)特征来存储传入的消息 .

    需要保存当前发件人,以便您不会不正确地关闭发件人actor参考 . 您可以通过类似下面定义的简单案例类来实现此目的 .

    class MyActor extends Actor with Stash {
    
      import context.dispatcher
    
      // Save the correct sender ref by using something like
      // case class WrappedFuture(senderRef: ActorRef, result: Any)
      def doThing(): Future[WrappedFuture] = ???
    
      override def receive: Receive = {
        case msg: DoThing =>
          doThing() pipeTo self
    
          context.become({
            case WrappedFuture(senderRef, result) =>
              senderRef ! result
              unstashAll()
              context.unbecome()
            case newMsg: DoThing =>
              stash()
          }, discardOld = false)
      }
    }
    
  • 0

    不是让一个演员处理这个问题,而是有两个链:

    • Actor 1接收初始消息,启动所有IO调用并将它们合并到Future中

    • 演员2收到合并期货的结果 .

    这并不保证保留消息排序,因此如果您需要,那么Actor 2必须知道Actor 1已经看到的消息,并且可能将早期消息存储在自身上 .

    我不知道Akka中有什么能解决这个问题 . 也许有一个库实现了这样的模式?

  • 8

    正如大卫建议的那样,但如果你想藏匿所有信息

    class MyActor extends Actor with Stash {
      def doThing(): Future[WrappedFuture] = {}
    
      override def receive: Receive = {
        case msg: DoThing =>
          val future = doThing() pipeTo sender
    
          //when future finishes unstash all messages
          future.onComplete {
            case Failure(e) => {
              unstashAll()
              context.unbecome()
            }
            case Success(value) =>  {
              unstashAll()
              context.unbecome()
            }
          }
          // Stash incoming messeges until finished with future,
          context.become({
            case _ ⇒ stash()
          }, discardOld = false)
      }
    }
    

相关问题