首页 文章

rxJava调度程序用例

提问于
浏览
224

在RxJava中有5 different schedulers可供选择:

immediate():创建并返回一个在当前线程上立即执行工作的Scheduler . trampoline():创建并返回一个调度程序,该调度程序对当前工作完成后要执行的当前线程进行排队 . newThread():创建并返回一个Scheduler,为每个工作单元创建一个新的Thread . computation():创建并返回用于计算工作的调度程序 . 这可以用于事件循环,处理回调和其他计算工作 . 不要在此调度程序上执行IO绑定工作 . 请改用Schedulers.io() . io():创建并返回一个用于IO绑定工作的调度程序 . 该实现由Executor线程池支持,该线程池将根据需要增长 . 这可用于异步执行阻塞IO . 不要在此调度程序上执行计算工作 . 请改用Schedulers.computation() .

问题:

前3个调度程序非常自我解释;但是,我对 computationio 有点困惑 .

  • 究竟是什么"IO-bound work"?它用于处理流( java.io )和文件( java.nio.files )吗?它用于数据库查询吗?它是用于下载文件还是访问REST API?

  • computation() 如何与 newThread() 不同?所有 computation() 调用是每次都在单个(后台)线程上而不是新(后台)线程吗?

  • 为什么在进行IO工作时调用 computation() 不好?

  • 为什么在进行计算工作时调用 io() 是不好的?

2 回答

  • 298

    最重要的一点是,Schedulers.io和Schedulers.computation都支持无限制的线程池,而不是问题中提到的其他线程池 . 在使用newCachedThreadPool(无限制的自动回收线程池)创建Executor的情况下,Scheduler.from(Executor)仅共享此特性 .

    正如之前的回复和Web上的多篇文章所充分说明的那样,Schedulers.io和Schedulers.computation应该谨慎使用,因为它们针对其名称中的工作类型进行了优化 . 但是,就我而言,他们最重要的角色是 provide real concurrency to reactive streams .

    与新手的看法相反,反应流本质上不是并发的,而是固有的异步和顺序 . 出于这个原因,只有当I / O操作阻塞时才使用Schedulers.io(例如:使用诸如Apache IOUtils FileUtils.readFileAsString(...)之类的阻塞命令)因此会冻结调用线程,直到操作为止 . 完成 .

    使用Java AsynchronousFileChannel(...)等异步方法不会在操作期间阻塞调用线程,因此使用单独的线程没有意义 . 事实上,Schedulers.io线程并不适合异步操作,因为它们不运行事件循环,并且回调永远不会被调用 .

    相同的逻辑适用于数据库访问或远程API调用 . 如果您可以使用异步或被动API来进行呼叫,请不要使用Schedulers.io .

    回到并发 . 您可能无法访问异步或反应API以异步或并发地执行I / O操作,因此您唯一的选择是在单独的线程上调度多个调用 . 唉, Reactive streams are sequential at their ends 但好消息是 the flatMap() operator can introduce concurrency at their core .

    必须在流构造中构建并发,通常使用flatMap()运算符 . 这个功能强大的运算符可以配置为在内部为flatMap()嵌入的Function <T,R>提供多线程上下文 . 该上下文由多线程调度程序(如Scheduler.io或Scheduler.computation)提供 .

    有关RxJava2 SchedulersConcurrency的文章中的更多详细信息,您可以在其中找到有关如何顺序和同时使用调度程序的代码示例和详细说明 .

    希望这可以帮助,

    Softjake

  • 2

    很棒的问题,我认为文档可以提供更多细节 .

    • io() 由无限制的线程池支持,这是你在CPU上施加很大负担的事情 . 因此,与文件系统的交互,与不同主机上的数据库或服务的交互就是很好的例子 .

    • computation() 由有界线程池支持,其大小等于可用处理器的数量 . 如果你试图在多个可用处理器之间并行安排cpu密集型工作(例如使用 newThread() ),那么当线程争夺处理器时,你就会面临线程创建开销和上下文切换开销,并且它可能会受到很大的性能影响 .

    • 最好将 computation() 留给CPU密集型工作,否则您将无法获得良好的CPU利用率 .

    • 由于2中讨论的原因,将 io() 称为计算工作是不好的. io() 是无界的,如果你在 io() 上并行安排了一千个计算任务,那么这千个任务中的每一个都将拥有自己的线程并争夺CPU产生的上下文转移成本 .

相关问题