我想实现ARM(自动资源管理)模式,其中资源是异步使用的 .
问题
假设我的资源看起来像:
class MyResource {
def foo() : Future[MyResource] = ???
// Other methods returning various futures
def close() : Unit = ???
}
object MyResource {
def open(name: String): Future[MyResource] = ???
}
所需的使用模式是:
val r : Future[MyResource] = MyResource.open("name")
r flatMap (r => {
r.foo() /* map ... */ andThen {
case _ => r.close()
}
})
省略的映射函数可能很复杂,涉及分支和链接期货,这些期货会重复调用返回期货的 r
方法 .
我想确保在所有未来的延续完成(或失败)之后调用 r.close()
. 在每个呼叫站点手动执行此操作非常容易出错 . 这需要ARM解决方案 .
尝试过的解决方案
scala-arm库通常是同步的 . 这段代码不会做正确的事情,因为在块内的期货完成之前会调用close():
for (r <- managed(MyResource.open("name"))) {
r map (_.foo()) // map ...
}
我虽然使用这个包装器:
def usingAsync[T](opener: => Future[MyResource]) (body: MyResource => Future[T]) : Future[T] =
opener flatMap {
myr => body(myr) andThen { case _ => myr.close() } }
然后呼叫站点看起来像:
usingAsync(MyResource.open("name")) ( myr => {
myr.foo // map ...
})
但是,块中的代码将负责返回在该块创建的所有其他期货完成时完成的Future . 如果它意外没有,那么资源将在所有使用它的期货完成之前关闭 . 并且没有静态验证来捕获此错误 . 例如,这将是一个运行时错误:
usingAsync(MyResource.open("name")) ( myr => {
myr.foo() // Do one thing
myr.bar() // Do another
})
如何解决这个问题?
显然,我可以使用scala-arm的分隔连续支持(CPS) . 它看起来有点复杂,我害怕弄错 . 它需要启用编译器插件 . 此外,我的团队对scala非常陌生,我不想要求他们使用CPS .
CPS是前进的唯一途径吗?是否有一个图书馆或设计模式可以更简单地使用Futures,或者使用scala-arm执行此操作的示例?
1 回答
Reactive Extensions(Rx)可能是另一种解决方案 . 围绕这种编程范式的势头越来越大,现在有许多语言可供使用,包括Scala .
Rx的基础是创建一个Observable,它是异步事件的来源 . Observable可以用复杂的方式链接,这就是它的力量 . 您订阅了一个Observable来监听onNext,onError和onComplete事件 . 您还会收到允许您取消订阅的订阅 .
我想你可能会在onCompleted和/或onError处理程序中添加一个resource.close()调用 .
请参阅RxScala文档:
更多信息:
RxScala网站:http://rxscala.github.io/
RxScala Observable:http://rxscala.github.io/scaladoc/index.html#rx.lang.scala.Observable
本·克里斯滕森在NetFlix上的简介:http://www.infoq.com/presentations/netflix-functional-rx
Erik Meijer提供了链式Observable的代码示例,以及Martin Odersky,Erik Meijer和Roland Kuhn在Coursera课程中的反应式编程原理 .