Java中ExecutorService与CompletableFuture指南

在本教程中,我们将探讨两个重要的 Java 类,用于处理需要并发运行的任务:ExecutorService和CompletableFuture。我们将学习它们的功能以及如何有效地使用它们,并且我们将了解它们之间的主要区别。

ExecutorService
ExecutorService是 Java 的java.util.concurrent包中的一个功能强大的接口,它简化了需要并发运行的任务的管理。它抽象了线程创建、管理和调度的复杂性,使我们能够专注于需要完成的实际工作。

ExecutorService提供了像submit()和execute()这样的方法 来提交我们想要并发运行的任务。然后,这些任务将排队并分配给线程池中的可用线程。如果任务返回结果,我们可以使用Future对象来检索它们。但是,  在 Future上使用get()等方法检索结果可能会阻塞调用线程,直到任务完成。

ExecutorService 专注于管理线程池和并发执行任务。它提供了创建具有不同配置的线程池的方法,例如固定大小、缓存和调度。

让我们看一个使用ExecutorService创建和维护三个线程的示例:

ExecutorService executor = Executors.newFixedThreadPool(3);
Future<Integer> future = executor.submit(() -> {
    // Task execution logic
    return 42;
});

newFixedThreadPool (3)方法调用创建一个包含三个线程的线程池,确保并发执行的任务不会超过三个。然后使用submit ()方法提交一个任务在线程池中执行,返回一个代表计算结果的Future对象。

CompletableFuture
CompletableFuture是在 Java 8 中引入的。它专注于组合异步操作并以更具声明性的方式处理其最终结果。 CompletableFuture 充当保存异步操作的 最终结果的容器。它可能不会立即得到结果,但它提供了方法来定义当结果可用时要做什么。

与ExecutorService 检索结果可能会阻塞线程 不同 , CompletableFuture 以非阻塞方式运行。

CompletableFuture为组合异步操作提供了更高级别的抽象。它侧重于定义工作流程并处理异步任务的最终结果。

下面是一个使用SupplyAsync() 启动返回字符串的异步任务的示例:

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 42;
});

在此示例中,supplyAsync()启动一个异步任务,返回结果 42。


虽然ExecutorService和CompletableFuture都处理 Java 中的异步编程,但它们的用途不同。

链接异步任务
ExecutorService和CompletableFuture都提供了链接异步任务的机制,但它们采用了不同的方法。

ExecutorService
在ExecutorService中,我们通常提交任务来执行,然后使用这些任务返回的Future对象来处理依赖关系并链接后续任务。然而,这涉及到阻塞并等待每个任务完成后再继续下一个任务,这可能导致处理异步工作流的效率低下。

考虑这样的情况:我们向 ExecutorService 提交两个任务,然后使用Future对象将它们链接在一起:

ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Integer> firstTask = executor.submit(() -> {
    return 42;
});
Future<String> secondTask = executor.submit(() -> {
    try {
        Integer result = firstTask.get();
        return "Result based on Task 1: " + result;
    } catch (InterruptedException | ExecutionException e) {
       
// Handle exception
    }
    return null;
});
executor.shutdown();

在此示例中,第二个任务取决于第一个任务的结果。但是,ExecutorService不提供内置链接,因此我们需要在提交第二个任务之前,通过使用get()等待第一个任务完成(这会阻塞线程)来显式管理依赖关系。

CompletableFuture
另一方面,CompletableFuture提供了一种更加简化和更具表现力的方式来链接异步任务。它使用thenApply()等内置方法简化了任务链。这些方法允许您定义一系列异步任务,其中一个任务的输出成为下一个任务的输入。

这是使用CompletableFuture 的等效示例:

CompletableFuture<Integer> firstTask = CompletableFuture.supplyAsync(() -> {
    return 42;
});
CompletableFuture<String> secondTask = firstTask.thenApply(result -> {
    return "Result based on Task 1: " + result;
});

在此示例中,thenApply()方法用于定义第二个任务,该任务取决于第一个任务的结果。当我们使用thenApply()将任务链接到CompletableFuture时,主线程不会等待第一个任务完成后再继续。它继续执行我们代码的其他部分。

错误处理
使用ExecutorService时,错误可以通过两种方式体现:

  • 提交的任务中抛出的异常:当我们尝试 在返回的Future对象上使用get()等方法检索结果时,这些异常会传播回主线程​​。如果处理不当,这可能会导致意外行为。
  • 线程池管理期间的未经检查的异常:如果在线程池创建或关闭期间发生未经检查的异常,通常是从 ExecutorService 方法本身抛出的。我们需要在代码中捕获并处理这些异常。

让我们看一个示例,突出显示潜在的问题:

ExecutorService executor = Executors.newFixedThreadPool(2);
Future<String> future = executor.submit(() -> {
    if (someCondition) {
        throw new RuntimeException("Something went wrong!");
    }
    return
"Success";
});
try {
    String result = future.get();
    System.out.println(
"Result: " + result);
} catch (InterruptedException | ExecutionException e) {
   
// Handle exception
} finally {
    executor.shutdown();
}

在此示例中,如果满足特定条件,则提交的任务将引发异常。但是,我们需要在future.get()周围使用try-catch块来捕获任务抛出的异常或使用get()检索期间抛出的异常。这种方法对于跨多个任务管理错误可能会变得乏味。

相比之下,CompletableFuture提供了一种更强大的错误处理方法,包括使用 excepting()等方法 以及在链接方法本身内处理异常。这些方法允许我们定义如何在异步工作流程的不同阶段处理错误,而不需要显式的try-catch块。

这是使用CompletableFuture进行错误处理的等效示例:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (someCondition) {
        throw new RuntimeException("Something went wrong!");
    }
    return
"Success";
})
.exceptionally(ex -> {
    System.err.println(
"Error in task: " + ex.getMessage());
    return
"Error occurred"; // Can optionally return a default value
});
future.thenAccept(result -> System.out.println(
"Result: " + result));

在此示例中,异步任务引发异常,并且错误在Exceptionly()回调中被捕获和处理。它在发生异常时提供默认值(“发生错误”)。

CompletableFuture提供exceptedly()、whenComplete()、在链接方法中处理

超时管理
超时管理在异步编程中至关重要,可以确保任务在指定的时间范围内完成。让我们探讨一下ExecutorService和CompletableFuture如何以不同的方式处理超时。

ExecutorService不提供内置超时功能。为了实现超时,我们需要使用Future对象并可能中断超过截止日期的任务。这种方法涉及手动协调:

ExecutorService executor = Executors.newFixedThreadPool(2);
Future<String> future = executor.submit(() -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        System.err.println("Error occured: " + e.getMessage());
    }
    return
"Task completed";
});
try {
    String result = future.get(2, TimeUnit.SECONDS);
    System.out.println(
"Result: " + result);
} catch (TimeoutException e) {
    System.err.println(
"Task execution timed out!");
    future.cancel(true);
// Manually interrupt the task.
} catch (Exception e) {
   
// Handle exception
} finally {
    executor.shutdown();
}

在此示例中,我们向 ExecutorService 提交一个任务,并在使用get()方法检索结果时指定两秒的超时。如果任务完成时间超过指定的超时时间,则会引发TimeoutException 。这种方法可能容易出错,需要小心处理。

需要注意的是,虽然超时机制中断了对任务结果的等待,但任务本身将继续在后台运行,直到完成或被中断。例如,要中断 ExecutorService 中运行的任务,我们需要使用Future.cancel(true)方法。

总之:ExecutorService 使用Future.get(timeout)的手动协调和潜在的中断

在 Java 9 中,CompletableFuture提供了一种更简化的超时方法,例如completeOnTimeout()等方法。如果原始任务未在指定的超时时间内完成,则completeOnTimeout()方法将以指定的值完成CompletableFuture 。

让我们看一个示例来说明completeOnTimeout()的工作原理:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        // Handle exception
    }
    return
"Task completed";
});
CompletableFuture<String> timeoutFuture = future.completeOnTimeout(
"Timed out!", 2, TimeUnit.SECONDS);
String result = timeoutFuture.join();
System.out.println(
"Result: " + result);

在此示例中,supplyAsync()方法启动一个异步任务,该任务模拟长时间运行的操作,需要五秒钟才能完成。但是,我们使用completeOnTimeout()指定两秒的超时。如果任务在两秒内没有完成,CompletableFuture将自动完成,并显示“Timed out!”值。