08、Java并发 Java CompletableFuture ( 上 )

Java 中的异步计算

异步计算很难推理的,因为我们的大脑是同步的,会将任何计算看成是一系列的同步计算。

我们在实现异步计算时,往往会把回调的动作分散在代码中或者深深地嵌套在彼此内部,这种情况下,当我们需要处理其中一个步骤中可能发生的错误时,情况变得更糟。

人生的一大悲剧是,尽管 Java 5 已经看到了这种恶性循环,提供了Future 接口作为异步计算的结果,但它没有提供任何方法来组合这些计算或处理可能的错误。

直到Java 8,才引入了 CompletableFuture 类。该类不仅实现了 Future 接口,还实现了 CompletionStage 接口。此接口定义了可与异步计算步骤组合的异步计算步骤契约。

官方文档真是拗口,简单来说,CompletionStage 接口规范了一个异步计算步骤如何与另一个异步计算步骤组合。

CompletableFuture 类还是一个集大成者,即是一个构建块,也是一个框架,提供了大约 50 种不同的方法来构造,组合,执行异步计算步骤和处理错误。

API数量如此之多,第一眼看到简直就傻眼了,不过好在它们可以分门别类,因为它们大多属于几个明确且不同的用例。

将 CompletableFuture 当作简单的 Future 使用

为什么可以 ?

因为CompletableFuture 类实现了 Future 接口,因此我们可以将其用作 Future 实现,但需要自己实现额外的完成逻辑。

例如,我们可以使用无任何参数的构造函数来创建此类的实例,用于表示未来的某些结果,然后将其交给使用者,并在将来的某个时间调用 complete() 方法完成。消费者可以使用 get() 方法来阻止当前线程,直到提供此结果。

public Future<String> calculateAsync() throws InterruptedException {
    CompletableFuture<String> completableFuture 
      = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });

    return completableFuture;
}

上面的实例中,我们创建了一个创建 CompletableFuture 实例的方法,把计算分离到另一个线程中并立即返回 Future。当计算完成后,该方法通过将结果提供给 complete() 方法来完成 Future。

为了分离计算,我们使用了前几章节 一文秒懂 Java 线程池 ( Thread Pool ) (上) 中提到的 Executor API。这种创建和完成 CompletableFuture 的方法可以与任何并发机制或 API( 包括原始线程 )一起使用。

请注意,calculateAsync() 方法返回 Future 实例。

接下来,我们只要调用此方法,接收 Future 实例并在我们准备阻塞结果时调用它的 get() 方法。

Future<String> completableFuture = calculateAsync();

// ... 

String result = completableFuture.get();
assertEquals("Hello", result);

注意:get() 方法会抛出一些已检查的异常,即 ExecutionException( 封装计算期间发生的异常 )和 InterruptedException( 表示执行方法的线程被中断的异常 )。

如果你已经知道计算的结果,可以将表示此计算的结果作为参数传递给 completedFuture() 静态方法,这样,Future 的 get() 方法永远不会阻塞,而是立即返回此结果。

Future<String> completableFuture = 
  CompletableFuture.completedFuture("Hello");

// ...

String result = completableFuture.get();
assertEquals("Hello", result);

当然了,有时候,你可能希望取消 Future 的执行。

假设我们没有找到结果并决定完全取消异步执行,这可以通过调用 Future 的 cancel() 方法完成。此方法接收一个布尔参数 mayInterruptIfRunning。

public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.cancel(false);
        return null;
    });

    return completableFuture;
}

在上面这个异步方法的修改版本的范例中,当我们使用 Future.get() 方法阻塞结果时,如果 future 取消了,那么将抛出CancellationException 异常。

但是,在类型为 CompletableFuture 的情况下,cancel() 方法没有任何效果,因为 CompletableFuture 并不会响应中断也不会处理中断。

用于封装计算逻辑的 CompletableFuture

上面讲解的这些代码,都允许我们选择任何并发执行机制,但是,如果我们想跳过这个样板代码并简单地异步执行一些代码呢?

CompletableFuture 的静态方法 runAsync() 和 supplyAsync() 允许我们从 Runnable 和 Supplier 中创建 CompletableFuture 实例。 Runnable 和 Supplier 都是功能接口,由 Java 8 的新功能,可以将它们的实例作为lambda 表达式传递:

  • Runnable 接口与线程中使用的旧接口相同,不允许返回值
  • Supplier 接口是一个通用的功能接口,只有一个没有参数的方法,并返回一个参数化类型的值

下面的代码演示了如何将 Supplier 的实例作为 lambda 表达式参数,该表达式执行计算并返回结果

CompletableFuture<String> future
  = CompletableFuture.supplyAsync(() -> "Hello");

// ...

assertEquals("Hello", future.get());

处理异步计算的结果

处理计算结果的最通用方法是将其提供给函数。CompletableFuture.thenApply() 方法就是这样做的: 接受一个 Function 实例,用它来处理结果并返回一个用于保存函数返回的值 Future

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");

assertEquals("Hello World", future.get());

如果你不需要在 Future 链中返回值,则可以使用 Consumer 功能接口的实例。它只有一个方法,该方法接受一个参数并返回 void。而相应的,CompletableFuture 也提供了一个使用 Consumer 实例的方法 thenAccept() 。该方法接收一个 Consumer 并将其传递给计算结果。

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));

future.get();

上面这个示例中,最后的 future.get() 方法会返回空值 Void 。

最后,如果你既不需要计算的值也不想在链的末尾返回一些值,那么你可以将 Runnable lambda 传递给 thenRun() 方法。

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenRun(() -> System.out.println("计算完成"));

future.get();

上面这个示例中,调用 future.get() 方法之后会在控制台打印一行 计算完成