RxJava,副作用和返回void的方法

我使用RxJava编写了一个存储库,但我无法找到处理repo写入和删除的最佳方法 .

写这篇文章时,我的目标是确保能够从RxJava副作用方法调用任何写入或删除方法 . 这意味着写/删除方法必须同步运行并且不返回Observable . 这样我可以做这样的事情:

repository
    .fetchData()
    .doOnNext(s -> if (s.equals("something") {
                       repository.writeData(s);
                   }
    )
    .subscribe();

这是一个简单的例子,但它表明我调用了一个返回Observable的存储库方法,订阅了Observable,当Observable的onNext()方法被调用时,我可以执行我的副作用,这是对内存缓存的同步写入或数据库 .

好的,这个例子效果很好,但现在我遇到了一个问题,我只需要从内存缓存或数据库中写入或删除数据 . 也就是说,我想使用RxJava来运行同步代码,其中方法返回void .

编写我的Repository的API的方式我想做一些像调用repository.deleteData()的东西,但是我想在主线程上做这件事 . 解决这个问题的一种方法是抛弃RxJava并创建一个线程来执行任务 . 这可行,但我用RxJava做其他所有事情,所以我也想在这里使用它 .

我找到了2个可能的解决方案但不确定它们有多正确:

1)使用 Observable.just(null) 启动一个observable,然后使用 doOnNext() 副作用方法调用我的存储库's method from. This works it just seems hacky but it lets me use RxJava and lets me specify the threads to subscribe on / observe on which makes threading easy and allows the repository'的API保持不变,使其仍然返回void .

2)我也可以做 Observable.defer() 之类的事情,然后从defer的调用方法我可以返回 Observable.just(repository.deleteData()) . 如果我这样做,我必须更改存储库的API,以便deleteData方法具有Void返回类型而不是void(并且该方法还添加了 return null; 以实现方法签名的 Contract ) . 这也允许我使用RxJava,指定我想要观察和订阅的线程并继续使用Observable,即使我真正做的是副作用 .

有没有办法让RxJava创建一个Observable,它调用一个带有void返回类型的方法,该方法在副作用方法之外运行同步代码?

回答(1)

3 years ago

那么,为什么不这样呢?

public static <T> Observable<T> observeCompletion(Runnable r) {
   return Observable.defer(() -> {
         r.run();
         return Observable.empty();
   };
}

现在,您可以通过onSubscribe执行所需的线程跳转,并将所有内容保存在Rx中 .