首页 文章

如何取消Java 8可完成的未来?

提问于
浏览
14

我正在玩Java 8可完成的期货 . 我有以下代码:

CountDownLatch waitLatch = new CountDownLatch(1);

CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
    try {
        System.out.println("Wait");
        waitLatch.await(); //cancel should interrupt
        System.out.println("Done");
    } catch (InterruptedException e) {
        System.out.println("Interrupted");
        throw new RuntimeException(e);
    }
});

sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");

assertTrue(future.isCancelled());

assertTrue(future.isDone());
sleep(100); //give it some time to finish

使用runAsync我计划执行等待锁存器的代码 . 接下来我取消了未来,期望被抛入中断的异常 . 但似乎线程在await调用上仍然被阻塞,即使未来被取消(断言传递),也不会抛出InterruptedException . 使用ExecutorService的等效代码按预期工作 . 它是CompletableFuture中的错误还是我的示例中的错误?

4 回答

  • 12

    显然,这是故意的 . 方法CompletableFuture::cancel的Javadoc声明:

    [Parameters:] mayInterruptIfRunning - 此值在此实现中无效,因为中断不用于控制处理 .

    有趣的是,方法ForkJoinTask::cancel对参数mayInterruptIfRunning使用几乎相同的措辞 .

    我猜这个问题:

    • 中断旨在与阻塞操作一起使用,例如睡眠,等待或I / O操作,

    • 但CompletableFuture和ForkJoinTask都不打算与阻塞操作一起使用 .

    CompletableFuture应该创建一个新的CompletionStage,而不是阻塞,而cpu绑定任务是fork-join模型的先决条件 . 因此,使用其中任何一个中断都会失败他们的目的 . 而另一方面,它可能会增加复杂性,如果按预期使用则不需要 .

  • 5

    当您调用 CompletableFuture#cancel 时,您只会停止链的下游部分 . 上游部分,i . 即最终会调用 complete(...)completeExceptionally(...) 的东西,不会得到任何不再需要结果的信号 .

    什么是'上游'和'下游'的东西?

    我们考虑以下代码:

    CompletableFuture
            .supplyAsync(() -> "hello")               //1
            .thenApply(s -> s + " world!")            //2
            .thenAccept(s -> System.out.println(s));  //3
    

    在这里,数据从上到下流动 - 从供应商创建,到功能修改,再到 println 消费 . 上述特定步骤的部分称为上游部分,下部部分称为下游部分 . E. g . 步骤1和2是步骤3的上游 .

    这是幕后发生的事情 . 这不是精确的,而是一个方便的思维模型 .

    • 正在执行供应商(步骤1)(在JVM的通用 ForkJoinPool 内) .

    • 然后供应商的结果由 complete(...) 传递到下一个 CompletableFuture 下游 .

    • 收到结果后, CompletableFuture 调用下一步 - 一个函数(步骤2),该函数接收上一步结果并返回将进一步传递的内容到下游 CompletableFuturecomplete(...) .

    • 收到第2步结果后,步骤3 CompletableFuture 调用消费者 System.out.println(s) . 消费者完成后,下游 CompletableFuture 将收到它的值, (Void) null

    正如我们所看到的,这个链中的每个 CompletableFuture 都必须知道下游有谁在等待将值传递给他们的 complete(...) (或 completeExceptionally(...) ) . 但是上游(或上游 - 可能有几个) .

    因此, calling cancel() upon step 3 doesn't abort steps 1 and 2 ,因为从步骤3到步骤2没有链接 .

    假设你正在使用 CompletableFuture ,那么你的步数就足够小了,如果执行几个额外的步骤就没有坏处 .

    如果要将取消传播到上游,您有两种选择:

    • 自己实现 - 创建一个专用的 CompletableFuture (名称就像 cancelled ),每个步骤后检查一次(类似于 step.applyToEither(cancelled, Function.identity())

    • 使用反应堆栈,如RxJava 2,ProjectReactor / Flux或Akka Streams

  • 2

    您需要CompletionStage的替代实现来完成真正的线程中断 . 我刚刚发布了一个小型图书馆,正是出于这个目的 - https://github.com/vsilaev/tascalate-concurrent

  • 0

    CancellationException是内部ForkJoin取消例程的一部分 . 检索未来结果时会出现异常:

    try { future.get(); }
          catch (Exception e){
              System.out.println(e.toString());            
          }
    

    花一点时间在调试器中看到这个 . JavaDoc对于正在发生的事情或您应该期待的事情并不清楚 .

相关问题