首页 文章

使用Akka和Scalatra

提问于
浏览
4

我的目标是为我的小部件构建一个高度并发的后端 . 我现在将后端作为Web服务公开,它接收运行特定小部件的请求(使用Scalatra),从DB获取小部件的代码并在演员(使用Akka)中运行它,然后回复结果 . 所以想象我做的事情如下:

get("/run:id") {
  ...
  val actor = Actor.actorOf("...").start
  val result = actor !! (("Run",id), 10000)
  ...
}

现在我相信这不是最好的并发解决方案,我应该在一个actor实现中以某种方式结合侦听请求和运行小部件 . 您如何设计这个以获得最大的并发性?谢谢 .

1 回答

  • 5

    您可以在akka引导文件或您自己的ServletContextListener中启动您的actor,以便它们在不依赖于servlet的情况下启动 . 然后你可以用akka注册表查找它们 .

    Actor.registry.actorFor[MyActor] foreach { _ !! (("Run",id), 10000) }
    

    除此之外,目前还没有与scalatra真正整合akka . 因此,到目前为止,您可以做的最好的事情是对一群演员使用阻止请求 .

    我不确定,但我不需要为每个请求生成一个actor,而是拥有一个可以发送这些请求的widget actor池 . 如果您使用管理程序层次结构,则可以使用管理程序调整池的大小(如果池太大或太小) .

    class MyContextListener extends ServletContextListener {
    
      def contextInitialized(sce: ServletContextEvent) {
        val factory = SupervisorFactory(
          SupervisorConfig(
          OneForOneStrategy(List(classOf[Exception]), 3, 1000),
          Supervise(actorOf[WidgetPoolSupervisor], Permanent)
      }
    
      def contextDestroyed(sce: ServletContextEvent) {
        Actor.registry.shutdownAll()
      }
    }
    
    class WidgetPoolSupervisor extends Actor {
    
      self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 3, 1000)
    
      override def preStart() {
        (1 to 5) foreach { _ =>
           self.spawnLink[MyWidgetProcessor]
        }
        Scheduler.schedule(self, 'checkPoolSize, 5, 5, TimeUnit.MINUTES)
      }
    
      protected def receive = {
        case 'checkPoolSize => {
          //implement logic that checks how quick the actors respond and if 
          //it takes to long add some actors to the pool.
          //as a bonus you can keep downsizing the actor pool until it reaches 1
          //or until the message starts returning too late.
        }
      }
    }
    
    class ScalatraApp extends ScalatraServlet {
    
      get("/run/:id") {
        // the !! construct should not appear anywhere else in your code except
        // in the scalatra action. You don't want to block anywhere else, but in a 
        // scalatra action it's ok as the web request itself is synchronous too and needs to 
        // to wait for the full response to have come back anyway.
        Actor.registry.actorFor[MyWidgetProcessor] foreach { 
          _ !! ((Run, id), 10000) 
        } getOrElse {
          throw new HeyIExpectedAResultException()
        } 
      }
    }
    

    请将上面的代码视为恰好看起来像scala的伪代码,我只是想说明这个概念 .

相关问题