首页 文章

如何设置akka Actor容错?

提问于
浏览
8

我试图在akka Actors中获得容错行为 . 我正在研究一些代码,这些代码依赖于系统中的Actors可用于长时间的处理 . 我发现我的处理在几个小时后停止(它应该需要大约10个小时)并且没有太多事情发生 . 我相信我的演员没有从异常中恢复过来 .

我需要做什么才能永久地一对一地重新启动Actors?我希望可以通过此文档完成此操作http://akka.io/docs/akka/1.1.3/scala/fault-tolerance

我正在使用akka 1.1.3和scala 2.9

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
import akka.dispatch.Dispatchers
import akka.routing.CyclicIterator
import akka.routing.LoadBalancer
import akka.config.Supervision._


object TestActor {
  val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
                   .setCorePoolSize(100)
                   .setMaxPoolSize(100)
                   .build
}

class TestActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.dispatcher = TestActor.dispatcher
    def receive = {
       case num: Integer => {  
         if( num % 2 == 0 )
           throw new Exception("This is a simulated failure")
         println("Actor: " + name + " Received: " + num)
         //Thread.sleep(100)
       }
    }

  override def postStop(){
    println("TestActor post Stop ")
  }

  //callback method for restart handling 
  override def preRestart(reason: Throwable){
    println("TestActor "+ name + " restaring after shutdown because of " + reason)
  }

  //callback method for restart handling 
  override def postRestart(reason: Throwable){
    println("Restaring TestActor "+name+"after shutdown because of " + reason)
  }  
}

trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
    val testActors: List[ActorRef]
    val seq = new CyclicIterator[ActorRef](testActors)
}

trait TestActorManager extends Actor {
     self.lifeCycle = Permanent
     self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000)
     val testActors: List[ActorRef]
     override def preStart = testActors foreach { self.startLink(_) }
     override def postStop = { System.out.println("postStop") }
}


  object FaultTest {
    def main(args : Array[String]) : Unit = {
      println("starting FaultTest.main()")
      val numOfActors = 5
      val supervisor = actorOf(
        new TestActorManager with CyclicLoadBalancing {
             val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i)));
        }
      )

      supervisor.start();

      println("Number of Actors: " +  Actor.registry.actorsFor(classOf[TestActor]).length)

      val testActor = Actor.registry.actorsFor(classOf[TestActor]).head

      (1 until 200 toList) foreach { testActor ! _ }

    }
  }

这段代码在LoadBalancer后面设置了5个Actors,它只打印出发送给它们的整数,除了它们在偶数上抛出异常以模拟故障 . 整数0到200被发送到这些Actors . 我希望奇数会得到输出,但是在偶数偶数故障之后,一切似乎都会关闭 . 使用sbt运行此代码会导致此输出:

[info] Running FaultTest 
starting FaultTest.main()
Loading config [akka.conf] from the application classpath.
Number of Actors: 5
Actor: 2 Received: 1
Actor: 2 Received: 9
Actor: 1 Received: 3
Actor: 3 Received: 7
[info] == run ==
[success] Successful.
[info] 
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM

我认为这里发生的是5个演员开始,前5个偶数让他们破产,他们没有重新开始 .

如何更改此代码以便Actors从异常中恢复?

我希望这实际上会打印出从1到200的所有奇数 . 我认为每个参与者都会在偶数上失败,但是会在例外情况下使用一个经过处理的邮箱重新启动 . 我希望看到preRestart和postRestart的println . 需要在此代码示例中配置什么才能实现这些目标?

以下是关于akka和Actors的一些额外假设,可能会导致我的误解 . 我假设可以使用Supervisor或faultHandler配置Actor,以便在接收期间抛出异常时重新启动并继续可用 . 我假设发送给actor的消息如果在接收期间抛出异常将丢失 . 我假设将调用抛出异常的actor上的preRestart()和postRestart() .

代码示例代表我正在尝试做的事情并基于Why is my Dispatching on Actors scaled down in Akka?

另一个代码示例

这是另一个更简单的代码示例 . 我正在创建一个在偶数上抛出异常的actor . 路上没有负载均衡器或其他东西 . 我正在尝试打印出关于演员的信息 . 在将消息发送给Actor并监视正在发生的事情之后,我等待退出程序一分钟 .

我希望这会打印出奇怪的数字,但看起来Actor会在其邮箱中找到消息 .

我是否将OneForOneStrategy设置错误?我需要将Actor链接到某个东西吗?这种配置是否从根本上误导了我? Dispatcher是否需要设置容错,如何?我可以搞乱Dispatcher中的线程吗?

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.ActorRegistry
import akka.config.Supervision._

class SingleActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000)
    def receive = {
       case num: Integer => {  
         if( num % 2 == 0 )
            throw new Exception("This is a simulated failure, where does this get logged?")
         println("Actor: " + name + " Received: " + num)
       }
    }

  override def postStop(){
    println("TestActor post Stop ")
  }

  override def preRestart(reason: Throwable){
    println("TestActor "+ name + " restaring after shutdown because of " + reason)
  }

  override def postRestart(reason: Throwable){
    println("Restaring TestActor "+name+"after shutdown because of " + reason)
  }  
}

object TestSingleActor{

    def main(args : Array[String]) : Unit = {
      println("starting TestSingleActor.main()")

      val testActor = Actor.actorOf( new SingleActor(1) ).start()

      println("number of actors: " + registry.actors.size)
      printAllActorsInfo

      (1 until 20 toList) foreach { testActor ! _ }

      for( i <- 1 until 120 ){
        Thread.sleep(500)
        printAllActorsInfo
      }
    }

  def printAllActorsInfo() ={
    registry.actors.foreach( (a) =>
       println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b "
               .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted)))
  }
}

我得到的输出如下:

[info] Running TestSingleActor 
starting TestSingleActor.main()
Loading config [akka.conf] from the application classpath.
number of actors: 1
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false 
Actor: 1 Received: 1
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 

... 117 more of these lines repeted ...

Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 
[info] == run ==
[success] Successful.
[info] 
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM

2 回答

  • 5

    问题是我使用的是akka.conf文件 . 我使用的是参考1.1.3 akka.conf文件,但配置事件处理程序的行除外 .

    我的(破碎的):

    event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
    

    参考1.1.3(有效的):

    event-handlers = ["akka.event.EventHandler$DefaultListener"]
    

    使用我的事件处理程序配置行,不会发生Actor重新启动 . 参考1.1.3线路重启很奇妙 .

    我根据这些说明进行了此更改http://akka.io/docs/akka/1.1.3/general/slf4j.html

    因此,通过删除该页面中的建议并返回到1.1.3参考akka.conf,我能够获得容错的Actors .

  • 1

    我相信你的问题在发送消息后终止,你不是试图保持你的异步应用程序活着,所以主线程退出,并用它来解决所有问题 .

相关问题