首页 文章

在Scala中实现没有Akka的Actor模型

提问于
浏览
0

我正在做我的小型研究,实现没有Akka的Actor我在Scala中找到了一个Actor的实现 . (How to implement actor model without Akka?

这很简单 . 因为我没有足够的声誉来添加评论,所以我创建了这个问题 . 我想知道我是否使用如下的Actor .

1 /如何从主线程中关闭该actor?

2 /如何添加类似于Akka的功能,如父演员,杀戮请求,并成为方法?

import scala.concurrent._

trait Actor[T] {
      implicit val context = ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(1))
      def receive: T => Unit
      def !(m: T) = Future { receive(m) }
}

在尝试调整上面的代码片段时,这是我自己的示例

import scala.concurrent._

/**
  * Created by hminle on 10/21/2016.
  */
trait Message
case class HelloMessage(hello: String) extends Message
case class GoodByeMessage(goodBye: String) extends Message

object State extends Enumeration {
  type State = Value
  val Waiting, Running, Terminating = Value
}

trait Actor[T] {
  implicit val context = ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(1))
  private var state: State.State = State.Waiting
  def handleMessage: T => Unit ={
    if(state == State.Waiting) handleMessageWhenWaiting
    else if(state == State.Running) handleMessageWhenRunning
    else handleMessageWhenTerminating
  }
  def !(m: T) = Future {handleMessage(m)}
  def handleMessageWhenWaiting: T => Unit
  def handleMessageWhenRunning: T => Unit
  def handleMessageWhenTerminating: T => Unit
  def transitionTo(destinationState: State.State): Unit = {
    this.state = destinationState
  }
}

class Component1 extends Actor[Message]{
  def handleMessageWhenRunning = {
    case HelloMessage(hello) => {
      println(Thread.currentThread().getName + hello)
    }
    case GoodByeMessage(goodBye) => {
      println(Thread.currentThread().getName + goodBye)
      transitionTo(State.Terminating)
    }
  }

  def handleMessageWhenWaiting = {
    case m => {
      println(Thread.currentThread().getName + " I am waiting, I am not ready to run")
      transitionTo(State.Running)
    }
  }

  def handleMessageWhenTerminating = {
    case m => {
      println(Thread.currentThread().getName + " I am terminating, I cannot handle any message")
      //need to shutdown here
    }
  }

}
class Component2(component1: Actor[Message]) extends Actor[Message]{
  def handleMessageWhenRunning = {
    case HelloMessage(hello) => {
      println(Thread.currentThread().getName + hello)
      component1 ! HelloMessage("hello 1")
    }
    case GoodByeMessage(goodBye) => {
      println(Thread.currentThread().getName + goodBye)
      component1 ! GoodByeMessage("goodbye 1")
      transitionTo(State.Terminating)
    }
  }

  def handleMessageWhenWaiting = {
    case m => {
      println(Thread.currentThread().getName + " I am waiting, I am not ready to run")
      transitionTo(State.Running)
    }
  }

  def handleMessageWhenTerminating = {
    case m => {
      println(Thread.currentThread().getName + " I am terminating, I cannot handle any message")
      //need to shutdown here
    }
  }
}
object ActorExample extends App {
  val a = new Component1
  val b = new Component2(a)
  b ! HelloMessage("hello World 2")
  b ! HelloMessage("hello World 2, 2nd")
  b ! GoodByeMessage("Good bye 2")
  println(Thread.currentThread().getName)
}

1 回答

  • 0

    您可以在 scalaz 中查看 Actor model 实现并从中获取想法,scalaz actor中的源代码比 akka 更容易获得洞察力 . 您可以自由选择架构:您可以像在Akka中一样使用基于ConcurrentLinkedQueue的邮箱,在scalaz中使用CAS作为AtomicReffernce,在您使用Future机制的情况下 . IMO,你必须写一个你的actor系统的上下文,所以在你的问题中解决第一和第二项,它是ActorContext的变体:

    val contextStack = new ThreadLocal[List[ActorContext]]
    

    和关机可能如下所示:

    1 .

    case Kill                       ⇒ throw new ActorKilledException("Kill")
    case PoisonPill                 ⇒ self.stop()
    

    2.对于存储父actor和类似任务,您必须存储它们的引用:

    def parent: ActorRef
    

    很难说每种技术(CAS,邮箱)的优势,它可能是你研究的变种 .

相关问题