首页 文章

RxJava-将Observable转换为Iterator,Stream或Sequence

提问于
浏览
2

我知道这会破坏很多Rx规则,但我真的很喜欢RxJava-JDBC,我的队友也是如此 . 关系数据库是我们工作的核心,Rx也是如此 .

但是在某些情况下我们不希望以 Observable<ResultSet> 的形式发射,而是只想拥有一个基于拉取的Java 8 Stream<ResultSet> 或Kotlin Sequence<ResultSet> . 但是我们已经习惯了只返回 Observable<ResultSet> 的RxJava-JDBC库 .

因此,我想知道是否有一种方法可以使用扩展函数将 Observable<ResultSet> 转换为 Sequence<ResultSet> ,而不是进行任何中间收集或 toBlocking() 调用 . 下面是我到目前为止所有的,但我现在正在试图连接基于推拉的系统,我无法缓冲,因为每个 onNext() 调用 ResultSet 都是有状态的 . 这是一项不可能的任务吗?

import rx.Observable
import rx.Subscriber
import java.sql.ResultSet

fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {

    private var isComplete = false

    override fun onCompleted() {
        isComplete = true
    }

    override fun onError(e: Throwable?) {
        throw UnsupportedOperationException()
    }

    override fun onNext(rs: ResultSet?) {
        throw UnsupportedOperationException()
    }


    override fun hasNext(): Boolean {
        throw UnsupportedOperationException()
    }

    override fun next(): ResultSet {
        throw UnsupportedOperationException()
    }

}.asSequence()

2 回答

  • 3

    我'm not sure that'是实现你想要的最简单方法,但你可以尝试这个代码 . 它通过创建阻塞队列并将 Observable 中的所有事件发布到此队列,将 Observable 转换为 Iterator . Iterable 从队列中拉出事件,如果没有则阻塞 . 然后根据收到的当前事件修改自己的状态 .

    class ObservableIterator<T>(
        observable: Observable<T>,
        scheduler: Scheduler
    ) : Iterator<T>, Closeable {
    
      private val queue = LinkedBlockingQueue<Notification<T>>()
      private var cached: Notification<T>? = null
      private var completed: Boolean = false
    
      private val subscription =
          observable
              .materialize()
              .subscribeOn(scheduler)
              .subscribe({ queue.put(it) })
    
      override fun hasNext(): Boolean {
        cacheNext()
        return !completed
      }
    
      override fun next(): T {
        cacheNext()
        val notification = cached ?: throw NoSuchElementException()
        check(notification.isOnNext)
        cached = null
        return notification.value
      }
    
      private fun cacheNext() {
        if (completed) {
          return
        }
    
        if (cached == null) {
          queue.take().let { notification ->
            if (notification.isOnError) {
              completed = true
              throw RuntimeException(notification.throwable)
            } else if (notification.isOnCompleted) {
              completed = true
            } else {
              cached = notification
            }
          }
        }
      }
    
      override fun close() {
        subscription.unsubscribe()
        completed = true
        cached = null
      }
    }
    
  • 2

    您可以使用以下辅助函数:

    fun <T> Observable<T>.asSequence() = Sequence { toBlocking().getIterator() }
    

    当为迭代器调用返回的序列时,将订阅observable .

    如果一个observable在它订阅的同一个线程上发出元素(例如 Observable.just ),它将在它有机会被返回之前填充迭代器的缓冲区 . 在这种情况下,您可能需要通过调用 subscribeOn 来直接订阅不同的线程:

    observable.subscribeOn(scheduler).asSequence()
    

    但是,虽然 toBlocking().getIterator() 并没有被迭代器及时消耗 . 如果 ResultSet 在下一个 ResultSet 到达时以某种方式过期,那可能会出现问题 .

相关问题