CompletableFuture,Future和RxJava的Observable之间的区别

问题

我想知道CompletableFuture,FutureObservable``RxJava之间的区别。

我所知道的都是异步但是

Future.get()阻塞线程

CompletableFuture给出了回调方法

RxJava Observable ---类似于CompletableFuture以及其他好处(不确定)

例如:如果客户端需要进行多次服务调用,那么当我们使用Futures(Java)Future.get()时,将按顺序执行...想知道它在RxJava中的效果如何...

和文档http://reactivex.io/intro.htmlsays
很难使用Futures来优化组合条件异步执行流程(或者不可能,因为每个请求的延迟在运行时会有所不同)。当然,这可以完成,但它很快变得复杂(因此容易出错)或者过早地阻塞Future.get(),这消除了异步执行的好处。
真有兴趣知道如何解决这个问题。我发现从文档中很难理解。


#1 热门回答(171 赞)

期货
Futures在Java 5(2004)中引入。它们是承诺在操作完成后保留操作结果的对象。例如,当任务(即RunnableorCallable)被提交给执行者时。调用者可以使用future对象来检查操作isDone(),或者等待它完成使用get()

例:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

CompletableFutures
CompletableFutures在Java 8(2014)中引入。它们实际上是由Google的Listenable Futures启发的常规期货的演变,这是Guavalibary的一部分。它们是Futures,它还允许你将任务串联起来。你可以使用它们来告诉一些工作者线程"去做一些任务X,当你完成后,用X的结果去做其他事情"。这是一个简单的例子:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava
RxJava是由Netflix创建的整个库,用于reactive programming。一目了然,它似乎与Java 8's streams相似。它是,除了它更强大。

与Futures类似,RxJava可用于将一堆同步或异步操作串联起来以产生一个或多个有意义的结果。然而,与一次性使用的Futures不同,RxJava可以处理零个或多个项目的流,包括具有无限数量项目的永无止境的流。由于令人难以置信的丰富set of operators,它也更加灵活和流畅。

与Java 8的流不同,RxJava具有abackpressure机制,允许它处理处理流的不同部分以不同的速率在不同的线程中运行的情况。

RxJava的缺点是,尽管有相当好的文档,但由于涉及范式转换,它是一个具有挑战性的库。 Rx代码也可能是调试的噩梦,特别是如果涉及多个线程,更糟糕的是 - 如果需要背压。如果你想进入它,官方网站上有各种各样的教程,还有官方的documentationJavadoc。你还可以查看一些视频,例如this one,它简要介绍了Rx,并讨论了Rx和Futures之间的差异。
奖励:Java 9 Reactive Streams
Java 9's Reactive StreamsakaFlow API是由各种reactive streams库实现的一组接口,例如RxJava 2,Akka StreamsVertx。它们允许这些反应库互连,同时保留所有重要的背压。


#2 热门回答(7 赞)

我从0.9开始使用Rx Java,现在是1.3.2,很快就会迁移到2.x我在一个已经工作了8年的私人项目中使用它。

我不会在没有这个库的情况下编程。一开始我很怀疑,但这是你需要创造的另一种完整的心态。一开始很安静。我有时会看着弹珠几个小时..哈哈

这只是一个练习的问题,并且真正了解流程(也就是可观察者和观察者的合同),一旦你到达那里,你就会讨厌这样做。

对我来说,这个图书馆并没有真正的缺点。

使用案例:我有一个监视器视图,包含9个仪表(cpu,mem,network等...)。启动视图时,视图会将自身订阅到系统监视器类,该类返回包含9米所有数据的可观察(间隔)。它将每秒推送一个新结果到视图(所以不要轮询!!!)。该observable使用flatmap同时(异步!)从9个不同的源获取数据,并将结果压缩到你的视图将在onNext()上获得的新模型。

你将如何用期货,补充等来做到这一点......祝你好运! :)

Rx Java为我解决了编程中的许多问题,并且在某种程度上更容易实现......

优点:

  • Statelss !!! (重要的是要提,最重要的可能)
  • 线程管理开箱即用
  • 构建具有自己生命周期的序列
  • 一切都是可观察的,所以链接很容易
  • 编写更少的代码
  • 类路径上的单个jar(非常轻量级)
  • 高度并发
  • 再也没有回调了
  • 基于订户(消费者和生产者之间的紧密合同)
  • 背压策略(断路器等)
  • 出色的错误处理和恢复
  • 非常好的文档(大理石<3)
  • 完全控制
  • 还有很多 ...

缺点: - 难以测试