我知道这会破坏很多Rx规则,但我真的很喜欢RxJava-JDBC,我的队友也是如此 . 关系数据库是我们工作的核心,Rx也是如此 .
但是在某些情况下我们不希望以 Observable<ResultSet>
的形式发射,而是只想拥有一个基于拉取的Java 8 Stream<ResultSet>
或Kotlin Sequence<ResultSet>
. 但是我们已经习惯了只返回 Observable<ResultSet>
的RxJava-JDBC库 .
因此,我想知道是否有一种方法可以使用扩展函数将 Observable<ResultSet>
转换为 Sequence<ResultSet>
,而不是进行任何中间收集或 toBlocking()
调用 . 下面是我到目前为止所有的,但我现在正在试图连接基于推拉的系统,我无法缓冲,因为每个 onNext()
调用 ResultSet
都是有状态的 . 这是一项不可能的任务吗?
import rx.Observable
import rx.Subscriber
import java.sql.ResultSet
fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {
private var isComplete = false
override fun onCompleted() {
isComplete = true
}
override fun onError(e: Throwable?) {
throw UnsupportedOperationException()
}
override fun onNext(rs: ResultSet?) {
throw UnsupportedOperationException()
}
override fun hasNext(): Boolean {
throw UnsupportedOperationException()
}
override fun next(): ResultSet {
throw UnsupportedOperationException()
}
}.asSequence()
2 回答
我'm not sure that'是实现你想要的最简单方法,但你可以尝试这个代码 . 它通过创建阻塞队列并将
Observable
中的所有事件发布到此队列,将Observable
转换为Iterator
.Iterable
从队列中拉出事件,如果没有则阻塞 . 然后根据收到的当前事件修改自己的状态 .您可以使用以下辅助函数:
当为迭代器调用返回的序列时,将订阅observable .
如果一个observable在它订阅的同一个线程上发出元素(例如
Observable.just
),它将在它有机会被返回之前填充迭代器的缓冲区 . 在这种情况下,您可能需要通过调用subscribeOn
来直接订阅不同的线程:但是,虽然
toBlocking().getIterator()
并没有被迭代器及时消耗 . 如果ResultSet
在下一个ResultSet
到达时以某种方式过期,那可能会出现问题 .