我希望能够对多个数据存储库发出并发请求并合并结果 . 我试图了解我的方法是否完全有效,或者是否有更好的方法来解决这个问题 . 我绝对是Akka / Spray / Scala的新手,我真的希望更好地了解如何正确构建这些组件 . 任何建议/提示将不胜感激 . 试图围绕使用演员和未来来实现这种类型的实现 .
Spray Service:
trait DemoService extends HttpService with Actor with ActorLogging {
implicit val timeout = Timeout(5 seconds) // needed for `?` below
val mongoMasterActor = context.actorOf(Props[MongoMasterActor], "redisactor")
val dbMaster = context.actorOf(Props[DbMasterActor], "dbactor")
val messageApiRouting =
path("summary" / Segment / Segment) { (dataset, timeslice) =>
onComplete(getDbResponses(dbMaster, dataset, timeslice)) {
case Success(dbMessageResponse) => complete(s"The result was $dbMessageResponse")
case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}")
}
}
/** Passes the desired actor reference for a specific dataset and timeslice for summary data retrieval
*
* @param mongoActor an actor reference to the RedisActor that will handle the appropriate request routing
* @param dataset The dataset for which the summary has been requested
* @param timeslice The timeslice (Month, Week, Day, etc.) for which the summary has been requested
*/
def getSummary(mongoActor: ActorRef, dataset: String, timeslice: String): Future[DbMessageResponse] = {
log.debug(s"dataset: $dataset timeslice: $timeslice")
val dbMessage = DbMessage("summary", dataset + timeslice)
(mongoActor ? dbMessage).mapTo[DbMessageResponse]
}
def getDbResponses(dbActor: ActorRef, dataset: String, timeslice: String): Future[SummaryResponse] = {
log.debug(s"dataset: $dataset timeslice: $timeslice")
val dbMessage = DbMessage("summary", dataset + timeslice)
(dbActor ? dbMessage).mapTo[SummaryResponse]
}
def getSummaryPayload(mongoSummary: DbMessageResponse, redisSummary: DbMessageResponse): String = {
mongoSummary.response + redisSummary.response
}
}
Akka Actor / Future mock db requests:
class DbMasterActor extends Actor with ActorLogging {
private var originalSender: ActorRef = _
//TODO: Need to add routing to the config to limit instances
val summaryActor = context.actorOf(Props(new SummaryActor), "summaryactor")
def receive = {
case msg: DbMessage => {
this.originalSender = sender
msg.query match {
case "summary" => {
getDbResults().onComplete{
case Success(result) => originalSender ! result
case Failure(ex) => log.error(ex.getMessage)
}
}
}
}
//If not match log an error
case _ => log.error("Received unknown message: {} ")
}
def getDbResults(): Future[SummaryResponse] = {
log.debug("hitting db results")
val mongoResult = Future{ Thread.sleep(500); "Mongo"}
val redisResult = Future{ Thread.sleep(800); "redis"}
for{
mResult <- mongoResult
rResult <- redisResult
} yield SummaryResponse(mResult, rResult)
}
}
1 回答
在Jamie Allen阅读Effective Akka之后,我将尝试应用他的“Cameo”模式建议 .
Slideshare:http://www.slideshare.net/shinolajla/effective-akka-scalaio
Github:https://github.com/jamie-allen/effective_akka
我认为我创造的东西会起作用,但听起来并不像杰米在谈话中的评论那样最好 . 我将更新/编辑回到我已经实现(或尝试)的帖子 .
Summary Actor (Cameo Actor):
Common:
Mock Stubs:
Boot (You should use test, but was just wanting to run from boot):
Application Config: