首页 文章

睡觉的演员?

提问于
浏览
15

让演员入睡最好的方法是什么?我将演员设置为代理,希望维护数据库的不同部分(包括从外部源获取数据) . 由于多种原因(包括没有超载数据库或通信和一般负载问题),我希望演员在每个操作之间休眠 . 我正在看10个演员对象 .

演员将无限运行,因为总会有新数据进入,或者坐在表中等待传播到数据库的其他部分等 . 想法是让数据库在任何时候都尽可能完整及时 .

我可以通过无限循环执行此操作,并在每个循环结束时进行休眠,但根据http://www.scala-lang.org/node/242,actor会使用一个线程池,只要所有线程都被阻塞,它就会被扩展 . 所以我想每个actor中的Thread.sleep都是个坏主意,因为会不必要地浪费线程 .

我可能有一个中央演员有自己的循环,在时钟上发送消息给订阅者(如异步事件时钟观察者)?

有没有人做过类似的事情或有任何建议?很抱歉有额外的(可能是多余的)信息 .

干杯

4 回答

  • 21

    没有必要明确地让一个actor睡觉:对每个actor使用 loopreact 意味着底层线程池将有等待线程,而没有消息供actor处理 .

    如果您希望为要处理的actor安排事件,使用 java.util.concurrent 实用程序中的单线程调度程序非常容易:

    object Scheduler {
      import java.util.concurrent.Executors
      import scala.compat.Platform
      import java.util.concurrent.TimeUnit
      private lazy val sched = Executors.newSingleThreadScheduledExecutor();
      def schedule(f: => Unit, time: Long) {
        sched.schedule(new Runnable {
          def run = f
        }, time , TimeUnit.MILLISECONDS);
      }
    }
    

    您可以对此进行扩展以执行定期任务,因此可以使用它:

    val execTime = //...  
    Scheduler.schedule( { Actor.actor { target ! message }; () }, execTime)
    

    然后,您的目标actor将只需要实现适当的 react 循环来处理给定的消息 . 你没有必要让任何演员入睡 .

  • 2

    在第一个答案中,Erlang有一个很好的观点,但它似乎消失了 . 您可以轻松地使用Scala actor执行与Erlang类似的技巧 . 例如 . 让我们创建一个不使用线程的调度程序:

    import actors.{Actor,TIMEOUT}
    
    def scheduler(time: Long)(f: => Unit) = {
      def fixedRateLoop {
        Actor.reactWithin(time) {
          case TIMEOUT => f; fixedRateLoop
          case 'stop => 
        }
      }
      Actor.actor(fixedRateLoop)
    }
    

    让我们使用测试客户端actor测试它(我在Scala REPL中做得很好):

    case class Ping(t: Long)
    
    import Actor._
    val test = actor { loop {
      receiveWithin(3000) {
        case Ping(t) => println(t/1000)
        case TIMEOUT => println("TIMEOUT")
        case 'stop => exit
      }
    } }
    

    运行调度程序:

    import compat.Platform.currentTime
    val sched = scheduler(2000) { test ! Ping(currentTime) }
    

    你会看到这样的东西

    scala> 1249383399
    1249383401
    1249383403
    1249383405
    1249383407
    

    这意味着我们的调度程序按预期每2秒发送一条消息 . 我们停止调度程序:

    sched ! 'stop
    

    测试客户端将开始报告超时:

    scala> TIMEOUT
    TIMEOUT
    TIMEOUT
    

    也停止它:

    test ! 'stop
    
  • 4

    lift-util的ActorPing(Apache许可证)有schedule和scheduleAtFixedRate来源:ActorPing.scala

    来自scaladoc:

    ActorPing对象以特定间隔调度要与给定消息进行ping操作的actor . schedule方法返回一个ScheduledFuture对象,必要时可以取消该对象

  • 17

    不幸的是,在oxbow_lakes的答案中有两个错误 .

    一个是简单的声明错误(长时间与时间:长),但第二个是更微妙的 .

    oxbow_lakes宣布为

    def run = actors.Scheduler.execute(f)
    

    然而,这会导致消息不时消失 . 那就是:它们已经预定但永远不会发送 . 声明运行为

    def run = f
    

    为我修好了 . 它完成了lift-util的ActorPing中的确切方式 .

    整个调度程序代码变为:

    object Scheduler {
        private lazy val sched = Executors.newSingleThreadedScheduledExecutor();
        def schedule(f: => Unit, time: Long) {
            sched.schedule(new Runnable {
              def run = f
            }, time - Platform.currentTime, TimeUnit.MILLISECONDS);
        }
    }
    

    我试着编辑oxbow_lakes帖子,但是无法保存它(坏了?),但我没有权利发表评论 . 因此一个新的帖子 .

相关问题