基于虚拟线程的结构化并发


在本文中,我们将讨论并发系统的一些新模式,这些模式是由 Java 21 中的新虚拟线程、结构化并发 (JEP 453 )和范围值(JEP 446: Scoped Values)组成的新的结构化并发模式。

虚拟线程是在 Java 21 中引入的,作为Loom 项目的主要输出之一,并且可能与Go语言中的 goroutine 最相似。

结构化并发 (JEP 453 )和范围值(JEP 446)这两个新 API 中的第一个称为“结构化并发”。这是一个用于线程处理的 API,它提供了一种将协作任务(通常是虚拟线程)作为子任务集合进行集体考虑和管理的方法。

并发编程中的一类问题称为数据并行问题。这些问题是将相同的操作应用于大量数据,并且这些操作(或多或少)彼此独立。这是阿姆达尔定律 等事物的适用范围,阿姆达尔定律是对并行计算加速计算能力的众所周知的限制。

结构化并发
相比之下,结构化并发是为任务并行问题而设计的,其中涉及需要并行处理的不同但相关的子任务。API 与虚拟线程的密切关系意味着它主要适用于涉及一定量 I/O 的任务(尤其是对远程服务的调用)。然而,该方法对于仅(或主要)作用于内存数据的操作不太有用,因为虚拟线程将相互竞争 CPU 时间。

结构化并发任务的一般流程如下所示:

  1. 创建任务作用域范围——创建线程拥有该范围(上下文)。
  2. 在作用域内分叉并发子任务(每个子任务都是一个虚拟线程)。
  3. 作用域范围的所有者将范围(所有子任务)作为一个单元加入。
  4. Scope 的join()方法会阻塞,直到所有子任务完成。
  5. 加入后,所有者处理分叉中的任何错误并处理结果。
  6. 所有者关闭作用域范围。

案例
让我们在一个使用股票提示计算的示例中看到这一点,我们将这样定义一个记录类:

record StockTip(String symbol, double sentiment, double delta24) {}

我们假设市场对股票的态度强度 ( sentiment) 以及未来 24 小时内可能的价格变化 ( delta24) 将通过某种外部过程来计算。这些元素可能需要一些时间来计算,并且这可能涉及网络流量。

因此,我们可以使用结构化子任务来计算它们,如下所示:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Callable<Double> getSentiment = () -> getSentiment(s);
    Subtask<Double> fSentiment = scope.fork(getSentiment);

    Callable<Double> getDelta = () -> getDelta24(s);
    Subtask<Double> fDelta = scope.fork(getDelta);

    scope.join();
    scope.throwIfFailed();

    return new StockTip(s, fSentiment.get(), fDelta.get());
} catch (ExecutionException | InterruptedException e) {
    throw new RuntimeException(e);
}

这遵循我们上面建立的结构化并发的一般流程。请注意,关闭作用域是通过 try-with-resources 块隐式处理的 - 这会关闭作用域并等待任何落后的子任务完成。

其他几点:

  • 首先,加入子任务也可以通过调用shutdown()方法来取消。
  • 其次,还有一个定时变体join(),称为joinUntil(),它接受截止日期(作为Instant参数)。

范围作用域有两种内置关闭策略(还支持自定义关闭策略):

  • 如果其中一个子任务失败,则取消所有子任务 ( ShutdownOnFailure)
  • 如果其中一个子任务成功,则取消所有子任务 ( ShutdownOnSuccess)

我们在第一个示例中遇到了第一个内置选项,因此让我们继续介绍另一个。

第二个案例
考虑一个库方法,其中启动多个子任务(可能是同一子任务的多个副本),并且第一个结果(来自任何子任务)就可以了。这些任务相互竞争来完成,一旦第一次成功发生,其余的虚拟线程就应该关闭,所以我们应该使用ShutdownOnSuccess这样的策略:

<T> T race(List<Callable<T>> tasks, Instant deadline)
        throws InterruptedException, ExecutionException, TimeoutException {

    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
        for (var task : tasks) {
            scope.fork(task);
        }
        return scope.joinUntil(deadline)
                    .result();  // 如果没有一个子任务成功完成,则抛出
    }
}

这有一个明显的双重性,即所有任务都必须运行完成(并且任何子任务的失败都会取消整个任务,所以我们将再次使用ShutdownOnFailure):

<T> List<T> runAll(List<Callable<T>> tasks)
        throws InterruptedException, ExecutionException {

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        List<? extends Subtask<T>> handles =
            tasks.stream().map(scope::fork).toList();

        scope.join()
             .throwIfFailed();  // Propagate exception if any subtask fails

       
// Here, all tasks have succeeded, so compose their results
        return handles.stream().map(Subtask::get).toList();
    }
}

请注意,此版本的代码将结果重新具体化为一个List,但也可以想象一个版本具有不同的终端操作,该操作减少了结果并返回单个值。

更复杂案例
我们还可以构建更复杂的结构——我们使用分叉创建的子任务本身可以创建范围(子范围)。这自然会产生范围和子任务的树结构,当我们想要从子任务树中压缩最终值时,这非常有用。

但是,如果我们代码的要点是通过副作用进行操作,那么就可以使用 StructuredTaskScope<Void>,即使用返回 void 的任务作用域,如本例所示:

void serveScope(ServerSocket serverSocket) throws IOException, InterruptedException {
    try (var scope = new StructuredTaskScope<Void>()) {
        try {
            while (true) {
                final var socket = serverSocket.accept();
                Callable<Void> task = () -> {
                    handle(socket);
                    return null;
                };
                scope.fork(task);
            }
        } finally {
            // If there's been an error or we're interrupted, we stop accepting
            scope.shutdown();  
// Close all active connections
            scope.join();
        }
    }
}

不过,可以说使用 "fire-and-forget "模式(如 newVirtualThreadPerTaskExecutor())处理这种情况通常会更好。这里的泛型还有一些小问题,比如需要显式地返回 null。

迄今为止,我们遇到的所有模式中反复出现的一个主题是,使用这些技术需要应用设计思维以及对所解决问题的领域和上下文的了解。没有任何软件工具能百分之百准确地判断一个线程是否适合转换为虚拟线程 vthread--这是人类软件工程师的任务。

同样,将任务重组为子任务并定义相关作用域也需要程序员对领域和子任务之间的数据依赖性有充分的了解。

接下来,让我们看看我们要讨论的第二个新 API。

范围值/作用域值Scoped Values
除了结构化并发(Structured Concurrency)之外,新的范围值 API(Scoped Values API)也作为预览版出现在 Java 21 中。

作用域值对于传递事务上下文等值和其他环境上下文数据非常有用。

Scoped Values API 基于 java.lang 中的一个新类 ScopedValue<T>,它表示将一个值绑定到特定作用域内的一个变量。该值只需编写一次,然后在每个作用域内不可更改。

绑定的值可以在作用域内的任何调用链上的任何一点被检索,但只能在设置它的作用域内被检索--这提供了健壮性和一种封装形式。特别是,不需要明确地将作用域值传递到调用链的下游。

作用域值可以被视为隐形参数,它们进入每个方法,并反过来传递给它们调用的每个方法。我们有时会说它们是隐式可用的,但这种形式比 Scala 的隐式方法参数更可控(也更 Java 化)。

Scoped Values API 也可以被看作是线程本地变量的现代替代品,但它具有一些增强功能,例如不变性。这意味着没有 set() 方法可以让远处的代码更改作用域值。由于运行时可以确定作用域值不会发生变化,这也使得未来的运行时优化成为可能。

该应用程序接口的一些目标是

  • 在线程内以及与子线程共享数据
  • 值的生命周期受到控制和约束
  • 从代码结构中可见的生命周期
  • 不变性允许大量线程共享
  • 不变性和显式生命周期通常更合适

程序员无需必要放弃 ThreadLocal,但作用域值与虚拟线程模式(如 fire-and-forget 模式)结合得很好。因此,随着作用域值的采用,ThreadLocal 很有可能在几乎所有用例中逐渐被取代。

下面是Web服务器没有使用作用域 的虚拟线程代码:

 

   private volatile boolean isShutdown = false;

    void handle(Socket socket) {
        // Handle incoming request
    }

    void serveVT(ServerSocket serverSocket) throws IOException, InterruptedException {
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            try {
                while (!isShutdown) {
                    var socket = serverSocket.accept();
                    executor.submit(() -> handle(socket));
                }
            } finally {
               
// If there's been an error, or we're interrupted, we stop accepting
                executor.shutdown();
            }
        }
    }

    public shutdown() {
        isShutdown = true;
    }


让我们重写虚拟线程网络服务器,以使用作用域值:

public class ServerSV {
    public final static ScopedValue<Socket> SOCKETSV = ScopedValue.newInstance();

    void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
        while (true) {
            var socket = serverSocket.accept();
            ScopedValue.where(SOCKETSV, socket)
                        .run(() -> handle());
        }
    }

    private void handle() {
        var socket = SOCKETSV.get();
        // handle incoming traffic
    }
}

请注意,handle() 方法现在不再需要参数socket,而是通过作用域值访问套接字socket--这就是我们上面讨论的隐式可用性。

这个示例非常简单,因为我们真正要做的就是用作用域值取代参数传递。但是作用域值的真正威力在于,调用链、作用域和子作用域可以任意复杂,而作用域值仍然可用。

总的来说,作用域值的目的是提供动态作用域,这是 Java 中从未有过的概念。这种作用域方法类似于其他一些语言,如 shell、Lisp 方言和 Perl。

我们可以将它与 Java 传统的作用域形式(通常称为词法作用域)进行对比。在这种情况下,变量的作用域由代码结构决定,通常由一对匹配的大括号来定义。

我们的动态作用域示例展示了关键模式的实际应用:

  • 使用static final作为作用域值的holder
  • 在类作用域中声明 ScopedValue 实例
  • 在方法中创建动态作用域(如 runWhere()
  • 使用 lambda 定义作用域主体(调用链所在的位置)

作用域值可以与结构并发性很好地交互,因为它们可以为作用域构建,然后由子作用域回弹。任何未回弹的值都将被子作用域继承。这种技术允许升级访问控制和类似模式,例如在本例中,我们将考虑两个安全访问级别:

enum SecurityLevel { USER, ADMIN }

我们将使用一个作用域值来保存当前的安全级别,另一个作用域值来保存当前的请求编号:

    private static final ScopedValue<SecurityLevel> securitySV = ScopedValue.newInstance();
    private static final ScopedValue<Integer> requestSV = ScopedValue.newInstance();

    private final AtomicInteger req = new AtomicInteger();

    public void run() {
        ScopedValue.where(securitySV, level())
                .where(requestSV, req.getAndIncrement())
                .run(() -> process());
    }

为了演示重新绑定,我们假设管理员权限不可用,因此任何使用管理员权限的尝试都将导致退回到用户权限:

    private void process() {
        var level = securitySV.get();
        if (level == SecurityLevel.USER) {
            System.out.println("User privileges granted for "+ requestSV.get() +" on: "+ Thread.currentThread());
        } else {
            System.out.println(
"Admin privileges requested for "+ requestSV.get() +" on: "+ Thread.currentThread());
            System.out.println(
"System is in lockdown. Falling back to user privileges");
            ScopedValue.where(securitySV, SecurityLevel.USER)
                    .run(() -> process());
        }
    }