首页 文章

如何在期货中使用Scala ARM?

提问于
浏览
9

我想实现ARM(自动资源管理)模式,其中资源是异步使用的 .

问题

假设我的资源看起来像:

class MyResource { 
  def foo() : Future[MyResource] = ???
  // Other methods returning various futures
  def close() : Unit = ???
}
object MyResource { 
  def open(name: String): Future[MyResource] = ???
}

所需的使用模式是:

val r : Future[MyResource] = MyResource.open("name")
r flatMap (r => {
  r.foo() /* map ... */ andThen {
    case _ => r.close()
  }
})

省略的映射函数可能很复杂,涉及分支和链接期货,这些期货会重复调用返回期货的 r 方法 .

我想确保在所有未来的延续完成(或失败)之后调用 r.close() . 在每个呼叫站点手动执行此操作非常容易出错 . 这需要ARM解决方案 .

尝试过的解决方案

scala-arm库通常是同步的 . 这段代码不会做正确的事情,因为在块内的期货完成之前会调用close():

for (r <- managed(MyResource.open("name"))) {
  r map (_.foo()) // map ...
}

我虽然使用这个包装器:

def usingAsync[T](opener: => Future[MyResource]) (body: MyResource => Future[T]) : Future[T] =
  opener flatMap { 
    myr => body(myr) andThen { case _ => myr.close() } }

然后呼叫站点看起来像:

usingAsync(MyResource.open("name")) ( myr => {
  myr.foo // map ...
})

但是,块中的代码将负责返回在该块创建的所有其他期货完成时完成的Future . 如果它意外没有,那么资源将在所有使用它的期货完成之前关闭 . 并且没有静态验证来捕获此错误 . 例如,这将是一个运行时错误:

usingAsync(MyResource.open("name")) ( myr => {
  myr.foo() // Do one thing
  myr.bar() // Do another
})

如何解决这个问题?

显然,我可以使用scala-arm的分隔连续支持(CPS) . 它看起来有点复杂,我害怕弄错 . 它需要启用编译器插件 . 此外,我的团队对scala非常陌生,我不想要求他们使用CPS .

CPS是前进的唯一途径吗?是否有一个图书馆或设计模式可以更简单地使用Futures,或者使用scala-arm执行此操作的示例?

1 回答

  • 2

    Reactive Extensions(Rx)可能是另一种解决方案 . 围绕这种编程范式的势头越来越大,现在有许多语言可供使用,包括Scala .

    Rx的基础是创建一个Observable,它是异步事件的来源 . Observable可以用复杂的方式链接,这就是它的力量 . 您订阅了一个Observable来监听onNext,onError和onComplete事件 . 您还会收到允许您取消订阅的订阅 .

    我想你可能会在onCompleted和/或onError处理程序中添加一个resource.close()调用 .

    请参阅RxScala文档:

    Observable.subscribe(
        onNext: (T) ⇒ Unit, 
        onError: (Throwable) ⇒ Unit, 
        onCompleted: () ⇒ Unit): Subscription
    

    更多信息:

相关问题