使用Java虚拟线程实现IO高吞吐量与CPU密集型结合编程 - DZone


虚拟线程作为 Java 19 的一部分于 2022 年 9 月作为预览功能发布,它们是平台线程的轻量级版本,与传统平台线程不同,虚拟线程的内存占用很小。

  1. 虚拟线程支持为每个工作单元模型创建一个线程,无论我们要处理多少任务,虚拟线程与 I/O 请求一起大放异彩,以支持并发高吞吐量 I/O 编程。
  2. 而CPU 绑定计算则还是通过传统平台线程和线程池获得最佳 CPU 利用率:我们可以执行 CPU 密集型任务并 利用线程池 来支持并行化,使我们能够使用 CPU 多核架构来最大化速度和资源利用率。

在本文中,当我们同时需要 I/O 任务 和 资源昂贵的 CPU 密集型计算时,我们提供不同的策略。

问题
让我们考虑一个关键任务应用程序的简单示例,该应用程序监视发电厂中的不同机器并快速确定它们是否正常运行。我们假设机器有连接到网络的传感器,这些传感器提供温度、压力等读数。问题的要点是我们必须连接到这些设备,获取它们的数据读数,并快速进行计算密集型数据分析以报告系统状态。

假设我们有一个连续运行的批处理进程来检查不同机器的状态。为了简化问题,我们假设进程从文件中连续读取数据,从中获取位置和必须监视的时间间隔。

我们的任务是展示我们将如何使用虚拟线程编写这个批处理过程,并展示我们将以编程方式解决这个问题的新方法及其相对于传统方法的优势。

使用虚拟线程编程
要开始我们的批处理,我们必须读取表的条目并将它们放在条目列表 ( List<InputEntry>) 中,其中InputEntry定义为 java 记录:

record InputEntry(String url, String id, String startTime, String endTime) {}

接下来,我们要翻阅清单,分析每台机器的传感器读数:

public static void processData(List<InputEntry> inputEntries) {
    System.out.println("processSensorReadings()");
    ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
    CompletionService<String> cService = new ExecutorCompletionService<>(executor);
    for (InputEntry inputEntry : inputEntries) {
        cService.submit(() -> processSensorData(inputEntry));
    }

    int processed = 0;
    while (processed < inputEntries.size()) {
        processed++;
        try {
            Future<String> resultFuture = cService.take();
            System.out.println(
"Handle status:" + resultFuture.get());
        } catch (ExecutionException | InterruptedException e) {
            System.out.println(
"Failed to process:" + e.getMessage());
        }
    }
}

在第3行,我们使用一个新的执行器,为每个任务启动一个虚拟线程。

在第4行中,我们将这个执行器包裹在一个CompletionService中,这使得我们能够按照虚拟线程完成的顺序来获取它们所运行的任务,这一点在第10-18行中可以看到。

在第6行,我们将processSensorData()提交给ComperationService,正如我们将看到的,它将进行I/O请求,可能还有数据分析。

由于虚拟线程是一个预览功能,我们需要用--enable-preview VM参数运行Java 19。

在这个阶段,我们注意到,对于每个InputEntry,我们正在创建一个虚拟线程。这个过程与我们对传统线程的编程方式形成了鲜明的对比,我们使用线程池来重用这种昂贵的资源。这种方法的优势是显而易见的,每个工作单位有一个虚拟线程。它促进了代码的逻辑性,而且更容易维护或排除问题。

取出和处理数据
我们需要获取传感器数据来进行分析,因此我们要做以下工作。

public static String processSensorData(InputEntry inputEntry) throws IOException, InterruptedException {

    DoubleStream data = fetchSensorData(inputEntry);

    return "ID: " + inputEntry.id() + ": " + analyzeSensorData(data);
}

private static DoubleStream fetchSensorData(InputEntry inputEntry) throws MalformedURLException, InterruptedException {
    URL pwUrl = new URL(inputEntry.url() +
"/startTime/endTime");
    
// In a real application open a secure url stream and fetch the data
    
// For this example we return some random data and simulate network latencies
    Thread.sleep((long) (Math.random() * 100));
    DoubleStream data = DoubleStream.generate(() -> new Random().nextDouble()).limit(100);
    return data;
}

在这个简单的例子中,我们只从一个传感器获取数据,但在一个更现实的应用中,我们可能需要在每个虚拟线程上处理多个传感器的多个I/O操作。

在这一点上,过程是明确的--我们的虚拟线程正在获取数据,并且是并行运行的。我们接下来的问题是,我们将如何分析数据。让我们来看看我们可以采取的不同策略。

分析数据
我们假设我们要进行的数据分析是CPU密集型的。理想情况下,我们希望在同一个线程中进行这种计算,以保持每单位工作的线程范式。但是,如果分析是时间敏感的,可能有更好的方法。我们知道,对于CPU绑定的工作负载,最好是使用线程池来获得最佳性能。

在这种情况下,虚拟线程将工作委托给线程池可能是最好的。

public static int analyzeSensorData(DoubleStream data) {

    double resultMean = 0;
    // Uses ForkJoinPool to improve performance
    
// In real application do proper data analysiys
    resultMean = data.parallel().average().getAsDouble();
    return determineStatusCode(resultMean);
}

private static int determineStatusCode(double result) {
    
// In a real application determine code based on result parameters
    if (result > 0.49) {
        return 719;
// Made up error code
    } else {
        return 0;
    }
}

在这个例子中,我们使用了Java Streams,它利用ForkJoinPool来进行计算。

在第6行,我们向Stream框架发出信号,使计算并行化。我们找到了平均数,但在实际应用中,分析工作会更加复杂。

我们使用Java提供的普通ForkJoinPool,但我们将在一个更现实的场景中定义我们自己的。我们还必须确定池子的大小,以使CPU的利用率最大化。对于普通池,你可以设置系统属性:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

其中我们假设10是优化我们系统的并发线程数。

我们可以看到,按照这个路径,我们通过从虚拟线程调用线程池来优化数据分析处理。当虚拟线程完成其I/O任务时,它们将利用Java流平台来执行(或排队)ForkJoinPool中的计算。

这是一个例子;我们可以使用其他线程池来进行计算。在提供的示例代码中,我们展示了进一步的例子。

另外,如果我们需要节制虚拟线程对稀缺资源的利用,我们可以使用semaphores,正如我们在下面的代码中展示的那样:

public static Semaphore semaphoreService = new Semaphore(NUM_PERMITS); 

public static String performScarceResourceRequest() throws InterruptedException, IOException {
    
    semaphoreService.acquire();
    //perform expensive request
    
// For this example we return some random data and simulate network latencies
    Thread.sleep((long) Math.random());
    semaphoreService.release();
    return
"Requested data";
}

我们对第1行中设置在semaphore 中的NUM_PERMITS数量进行微调,以优化对资源的访问。虚拟线程必须从semaphore那里获得许可才能运行。

最后,我们涉及到虚拟线程执行的任务需要其他子任务来完成的情况。
Java 19版本也有一个支持结构化并发的structured concurrency框架。这个项目正处于孵化阶段。它为用虚拟线程运行相关任务提供了强大的支持。我们将介绍一个简单的案例,在分析来自传感器的数据之前,我们必须执行一些验证。

public static int validateAndAnalyzeSensorData(DoubleStream data) throws IOException, InterruptedException, ExecutionException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Future<String> validatedData = scope.fork(() -> validateData(data));
        Future<String> checkedEnvironment = scope.fork(() -> checkEnvironment(data));
        scope.join();
        scope.throwIfFailed(e -> new IOException(e));
        if (validatedData.get().equals("ok") && checkedEnvironment.get().equals("ok"))
            return analyzeSensorData(data);
        else 
            return -1;
    }
}

public static String validateData(DoubleStream data) throws IOException, InterruptedException {
    
// In a real application open a secure url stream and fetch the data
    Thread.sleep((long) Math.random());
    return
"ok";
}

public static String checkEnvironment(DoubleStream data) throws IOException, InterruptedException {
    
// In a real application open a secure url stream and fetch the data
    Thread.sleep((long) Math.random());
    return
"ok";
}

在第2行,我们使用StructuredTaskScope来协调并发运行的任务。StructuredTaskScope支持try-with-resources语句,以确保在代码的try块结束时关闭资源。

我们在第3行和第4行中运行验证,并检查调用 scope.fork() 的上下文,该上下文接收一个 Callable 作为参数,生成一个新的线程,并返回一个 Future。父任务作用域的ThreadFactory会创建新线程,所以我们才会有虚拟线程。

在第4行,我们将两个任务连接起来,第6行调用 scope.throwIfFailed() 来传播任何错误。最后,在第7行,我们知道这两个任务已经成功完成,所以我们可以继续。

由于结构并发是在孵化器模式下进行的,我们需要用参数:--add-modules=jdk.incubator.concurrent.

另一种方法是在父代的虚拟线程中串行地做这两个子任务。如果任务密切相关,并且我们想保持每个工作单位的线程范式,这可能是可取的。然而,这并不像之前的方法那样具有良好的性能。

结论
虚拟线程有望彻底改变我们完成并发 I/O 编程的方式。本教程涵盖了当我们的任务需要支持 I/O 和 CPU 密集型工作负载时处理每个工作单元线程的不同方法。
您可以在 GitHub 上找到本文中使用的源代码。