首页 文章

如何观看多个akka演员终止

提问于
浏览
0

我有一个akka系统,它基本上是两个 生产环境 者演员,它们向一个消费者演员发送消息 . 在简化的形式我有这样的事情:

class ProducerA extends Actor {
    def receive = {
        case Produce => Consumer ! generateMessageA()
    }

    ... more code ...
}

class ProducerB extends Actor {
    def receive = {
        case Produce => Consumer ! generateMessageB()
    }

    ... more code ...
}

class Consumer extends Actor {
    def receive = {
        case A => handleMessageA(A)
        case B => handleMessageB(B)
    }

    ... more code ...
}

他们都是同一个akka系统的兄弟姐妹 .

我试图弄清楚如何优雅地终止这个系统 . 这意味着在关机时我希望 ProducerAProducerB 立即停止然后我希望 Consumer 完成处理消息队列中剩余的任何消息然后关闭 .

看起来我想要的是 Consumer 演员能够观察 ProducerAProducerB 的终止 . 或者通常,似乎我想要的是能够在两个 生产环境 者停止后向 Consumer 发送 PoisonPill 消息 .

https://alvinalexander.com/scala/how-to-monitor-akka-actor-death-with-watch-method

上面的教程很好地解释了一个演员如何观看另一个演员的终止,但不确定演员如何观察多个演员的终止 .

3 回答

  • 0
    import akka.actor._
    import akka.util.Timeout
    import scala.concurrent.duration.DurationInt
    
    class Producer extends Actor {
      def receive = {
        case _ => println("Producer received a message")
      }
    }
    
    case object KillConsumer
    
    class Consumer extends Actor {
    
      def receive = {
        case KillConsumer =>
          println("Stopping Consumer After All Producers")
          context.stop(self)
        case _ => println("Parent received a message")
      }
    
      override def postStop(): Unit = {
        println("Post Stop Consumer")
        super.postStop()
      }
    }
    
    class ProducerWatchers(producerListRef: List[ActorRef], consumerRef: ActorRef) extends Actor {
      producerListRef.foreach(x => context.watch(x))
      context.watch(consumerRef)
      var producerActorCount = producerListRef.length
      implicit val timeout: Timeout = Timeout(5 seconds)
      override def receive: Receive = {
        case Terminated(x) if producerActorCount == 1 && producerListRef.contains(x) =>
          consumerRef ! KillConsumer
    
        case Terminated(x) if producerListRef.contains(x) =>
          producerActorCount -= 1
    
        case Terminated(`consumerRef`) =>
          println("Killing ProducerWatchers On Consumer End")
          context.stop(self)
    
        case _ => println("Dropping Message")
      }
    
      override def postStop(): Unit = {
        println("Post Stop ProducerWatchers")
        super.postStop()
      }
    }
    
    object ProducerWatchers {
      def apply(producerListRef: List[ActorRef], consumerRef: ActorRef) : Props = Props(new ProducerWatchers(producerListRef, consumerRef))
    }
    
    object AkkaActorKill {
      def main(args: Array[String]): Unit = {
        val actorSystem = ActorSystem("AkkaActorKill")
        implicit val timeout: Timeout = Timeout(10 seconds)
    
        val consumerRef = actorSystem.actorOf(Props[Consumer], "Consumer")
        val producer1 = actorSystem.actorOf(Props[Producer], name = "Producer1")
        val producer2 = actorSystem.actorOf(Props[Producer], name = "Producer2")
        val producer3 = actorSystem.actorOf(Props[Producer], name = "Producer3")
    
        val producerWatchers = actorSystem.actorOf(ProducerWatchers(List[ActorRef](producer1, producer2, producer3), consumerRef),"ProducerWatchers")
    
        producer1 ! PoisonPill
        producer2 ! PoisonPill
        producer3 ! PoisonPill
    
        Thread.sleep(5000)
        actorSystem.terminate
      }
    }
    

    它可以使用 ProducerWatchers actor实现,它管理 生产环境 者被杀死,一旦所有生成器被杀死,你可以杀死Consumer actor,然后是ProducerWatchers actor .

  • 1

    一个演员可以通过 context.watch 的多次调用来观看多个演员,每次调用都会传入不同的 ActorRef . 例如,您的 Consumer actor可以通过以下方式观看 Producer 演员的终止:

    case class WatchMe(ref: ActorRef)
    
    class Consumer extends Actor {
      var watched = Set[ActorRef]()
    
      def receive = {
        case WatchMe(ref) =>
          context.watch(ref)
          watched = watched + ref
        case Terminated(ref) =>
          watched = watched - ref
          if (watched.isEmpty) self ! PoisonPill
        // case ...
      }
    }
    

    两个 Producer 演员都会发送他们各自对 Consumer 的引用,然后监视 Producer 演员终止 . 当 Producer actor都被终止时, Consumer 会向自己发送一个 PoisonPill . 因为PoisonPill is treated like a normal message in an actor's mailboxConsumer 将在处理 PoisonPill 并关闭自身之前处理已排队的所有消息 .

    类似的模式在Derek Wyatt's "Shutdown Patterns in Akka 2" blog post中描述,在Akka文档中提到 .

  • 1

    所以我最终选择的解决方案受到Derek Wyatt's terminator pattern的启发

    val shutdownFut = Future.sequence(
      Seq(
        gracefulStop(producerA, ProducerShutdownWaitSeconds seconds),
        gracefulStop(producerB, ProducerShutdownWaitSeconds seconds),
      )
    ).flatMap(_ => gracefulStop(consumer, ConsumerShutdownWaitSeconds seconds))
    
    Await.result(shutdownFut, (ProducerShutdownWaitSeconds seconds) + (ConsumerShutdownWaitSeconds seconds))
    

    这或多或少正是我想要的 . 根据期货的履行情况,消费者关闭等待 生产环境 者关闭 . 此外,整个关闭本身会导致未来,您可以等待,因此能够保持线程足够长,以便正确清理所有内容 .

相关问题