首页 文章

Akka:正确使用`ask`模式?

提问于
浏览
12

我正试图找到 Futures 并在akka中询问模式 .

所以,我制作了两个演员,一个要求另一个演员给他回信息 . 好吧,根据akka的 Futures 文件,演员应该询问( ? )的消息,它应该给他一个 Future instanse . 然后actor应该阻止(使用 Await )获得 Future 结果 .

好吧,我永远不会完成我的未来 . 这是为什么?

代码是:

package head_thrash

import akka.actor._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._

object Main extends App {

  val system = ActorSystem("actors")

  val actor1 = system.actorOf(Props[MyActor], "node_1")
  val actor2 = system.actorOf(Props[MyActor], "node_2")

  actor2 ! "ping_other"

  system.awaitTermination()

  Console.println("Bye!")
}

class MyActor extends Actor with ActorLogging {
  import akka.pattern.ask

  implicit val timeout = Timeout(100.days)

  def receive = {
    case "ping_other" => {
      val selection = context.actorSelection("../node_1")
      log.info("Sending ping to node_1")
      val result = Await.result(selection ? "ping", Duration.Inf) // <-- Blocks here forever!
      log.info("Got result " + result)
    }
    case "ping" => {
      log.info("Sending back pong!")
      sender ! "pong"
    }
  }
}

如果我将 Duration.Inf 更改为 5.seconds ,则演员等待5秒,告诉我的未来是Timeouted(通过抛出 TimeoutException ),然后其他演员最终回复所需的消息 . 所以,没有异步发生 . 为什么? :-(

我该如何正确实施该模式?谢谢 .

2 回答

  • 5

    这不起作用的两个原因 .

    首先,“node_1”自问并且“ping”将不会被处理,因为它在等待请求时阻塞 .

    此外,相对路径(“../node_1”)存在actorSelection的缺点 . 它是通过消息传递处理的,并且由于你的actor阻塞它,它不能处理任何其他消息 . 在即将推出的2.3版Akka中,这已得到改进,但无论如何都应该避免阻塞 .

  • 10

    官方Akka documentation说Await.result会导致当前线程阻塞并等待Actor回复它的未来'complete' .

    奇怪的是,你的代码永远存在于那里,你的所有应用程序只有一个线程吗?

    无论如何,我想一个更“惯用”的代码编码方式就是对未来的成功使用回调 .

    def receive = {
        case "ping_other" => {
          val selection = context.actorSelection("../node_1")
          log.info("Sending ping to node_1")
          val future: Future[String] = ask(selection, "ping").mapTo[String]
          future.onSuccess { 
             case result : String ⇒ log.info("Got result " + result)
          }
        }
    ...
    

相关问题