首页 文章

RxJava没有在后台线程上运行

提问于
浏览
0

我正在尝试在Room中保存数据,它需要一些后台线程来保存数据 . 所以我创造了一个像这样的观察者

val obs: Observable<MutableLiveData<List<Source>>>? = Observable.fromCallable(object :Callable<MutableLiveData<List<Source>>>{
            override fun call(): MutableLiveData<List<Source>> {

                return mutableLiveData
            }
        })

然后我订阅,观察和取消订阅它

obs?.subscribeOn(Schedulers.io())?.observeOn(AndroidSchedulers.mainThread())?.unsubscribeOn(Schedulers.io())
                ?.subscribe(object : Observer<MutableLiveData<List<Source>>>{
                    override fun onComplete() {

                    }

                    override fun onSubscribe(d: Disposable?) {

                    }

                    override fun onNext(value: MutableLiveData<List<Source>>?) {
                        for(source in value!!.value!!.iterator()){
                            sourceDao.insert(source)//this is the line number 87, that logcat is pointing
                        }
                    }

                    override fun onError(e: Throwable?) {
                        e?.printStackTrace()
                    }
                })

我在Schedulers.io线程上订阅它,然后在AndroidSchedulers.mainThread()上观察它,但我仍然没有得到后台线程错误 . 进一步来说

Caused by: java.lang.IllegalStateException: Cannot access database on the main thread since it may potentially lock the UI for a long period of time.
        at android.arch.persistence.room.RoomDatabase.assertNotMainThread(RoomDatabase.java:204)
        at android.arch.persistence.room.RoomDatabase.beginTransaction(RoomDatabase.java:251)
06-18 11:11:08.674 3732-3732/com.theanilpaudel.technewspro W/System.err:     at com.package.myapp.room.SourceDao_Impl.insert(SourceDao_Impl.java:63)
        at com.package.myapp.main.MainRepository$saveToRoom$1.onNext(MainRepository.kt:87)
        at com.package.myapp.main.MainRepository$saveToRoom$1.onNext(MainRepository.kt:76)

2 回答

  • 2

    在Observable中执行数据库操作,而不是在Observer中执行:

    val obs: Observable<MutableLiveData<List<Source>>>? = Observable.fromCallable(object :Callable<MutableLiveData<List<Source>>>{
            override fun call(): MutableLiveData<List<Source>> {
                 for(source in mutableLiveData!!.value!!.iterator()){
                      sourceDao.insert(source)
                 }
                return mutableLiveData
            })
            .subscribeOn(Schedulers.io())?.observeOn(AndroidSchedulers.mainThread())?.unsubscribeOn(Schedulers.io())
                ?.subscribe(object : Observer<MutableLiveData<List<Source>>>{
                    override fun onComplete() {
    
                    }
    
                    override fun onSubscribe(d: Disposable?) {
    
                    }
    
                    override fun onNext(value: MutableLiveData<List<Source>>?) {
                        // Nothing todo right more
                    }
    
                    override fun onError(e: Throwable?) {
                        e?.printStackTrace()
                    }
                })
        })
    
  • 1

    这是有道理的,因为你有 observeOn 主线程并且你正在观察者中做你的工作( observeOn 管理观察者的线程) .

    要解决此问题,您可以在 obs 上使用flatmap并在那里执行for循环 . 由于flatmap要求您返回Observable,因此您可以在for循环后 return Observable.just(true) .

相关问题