使用Java虚拟线程实现Actor模型保护状态 - Adam


Java 19 包含Project Loom核心组件的预览:虚拟线程。我们现在可以随意创建线程,因为我们不再(或至少少得多)受它们在内存和上下文切换时间方面的成本的限制。

哪里有线程,哪里就有并发。那么这对于 JVM 上的并发性意味着什么呢?可以肯定的是,阻塞 API 将卷土重来,取代异步、基于 Future 或反应式的代码。但是到什么程度呢?这还有待观察。

但是,无论我们编写代码的方式如何(阻塞线程、使用Futures 或响应式扩展),作为一个行业,我们在处理并发时已经制定了一些最佳实践。在高层次上,这样的一种做法是避免使用共享内存协调并发运行的进程。这也意味着避免使用锁来同步对该内存的访问。

共享内存和锁的主要问题是死锁的可能性(如果锁并不总是以相同的顺序获取)。这种方法也可能导致高争用和阻塞线程:即使线程是虚拟的,它们也可能实现重要的业务功能,除非绝对必要,否则不应阻塞。最后,我们可能会遇到数据竞争,如果不是所有必需的锁都被获取或持有的时间不够长。

我们可以将设计基于消息传递,而不是共享内存和锁。这是Actor模型的基石,在Erlang和Akka中实现,但它也是 Go 的goroutines的基础。

在最初的草稿中,Project Loom 还包括一个类似 Go 的通道实现;但是,它不包含在预览中。不过,我们可能会怀疑,通道将在稍后的某个时间点添加到 Java 的 std 库中。

Actor
好消息是,使用 Loom 实现一个Actor,或者至少是一个类似Actor的抽象,变得更加容易。但首先,什么是Actor?

Actor 包含了一些(可能是可变的)状态,在按顺序处理来自其邮箱的传入消息时使用该状态。发送消息(并可选择接收回复)是与参与者交互的唯一选项。消息是逐个串行处理的,因此无法同时访问actor的状态。

将其转换为更专业的术语,参与者运行一个无限循环,该循环使用并处理来自队列的消息。以前,实现这一点需要编写一个自定义调度程序,它将许多参与者多路复用到一个有界线程池中。使用 Java 19,我们可以为每个参与者创建一个虚拟线程并依赖 JVM 的调度。

让我们看看如何使用 Loom 实现一个基本的 Actor 实现。当然,它远不及像 Akka 这样的生产级 Actor 实现,后者包括监督、错误处理、远程处理、位置透明性等功能。尽管如此,如果你有一些可变状态,它的访问应该受到保护和序列化,它可能会起作用。

使用 Loom 实现
在我们的实现中,actor 将是一对:一个队列(我们将使用 unbounded LinkedBlockingQueue)和一个从该队列消费的虚拟线程。我们至少需要两个类:Actor实现运行循环,以及ActorRef(遵循 Akka 的术语),它允许向参与者发送消息。它Actor是单线程的,ActorRef可以在多个线程之间自由共享。

我们将为所有消息定义一个基本接口,要求它们指定发送者在响应正在处理的消息时可能期望的回复类型。这与 Actor 通常的实现方式不同——使用单向开火并忘记。然而,在我们的Actor微库的简单用例中,访问回复可能很有用:

public interface Message<REPLY> {
    default Reply reply(REPLY r) {
        return new Reply(r);
    }
}

public class Reply {
    private final Object reply;
    Reply(Object reply) { this.reply = reply; }
    Object getReply() { return reply; }
}

课程更多的Reply是技术性。它旨在包装对消息的回复,以便发回。它只能使用 来创建Message.reply,它采用正确类型的参数(注意包私有构造函数)。如果我们希望将参与者的行为表示为简单的模式匹配(参见下面的示例),我们无法使用泛型来表达这一点,因为 Java 中没有流类型。

该ActorRef.send方法接受一条消息并将其放在参与者的队列中:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class ActorRef {
    private final LinkedBlockingQueue<PendingMessage<?>> queue;

    ActorRef(LinkedBlockingQueue<PendingMessage<?>> queue) {
        this.queue = queue;
    }

    public <R> Future<R> send(Message<R> message) {
        var future = new CompletableFuture<R>();
        queue.add(new PendingMessage<>(message, future));
        return future;
    }
}

//

import java.util.concurrent.CompletableFuture;

record PendingMessage<R>(
    Message<R> message, 
    CompletableFuture<R> future) {}

队列包含原始消息和记录CompletableFuture中捕获的用于回复消息的PendingMessage。

每个参与者仅支持特定的Message实现子集。理想情况下,我们希望将其添加为ActorRef使用泛型参数的约束。尽管如此,由于 Java 有限的泛型(或者我有限的 Java API 建模知识),这似乎是不可能的。因此,该设计并不像我们理想的那样是类型安全的。

future!?

等等,你说——Future在那里做什么?我们不是已经完成了这个包装废话吗?我们不是要到处使用阻塞代码,因为我们有轻量级线程吗?

不总是——Future仍然是一个方便的抽象!这里它捕获了一个重要属性:消息是异步处理的。这是我们有意识的决定,可能对我们的业务流程至关重要。我们可能对回复根本不感兴趣(在这种情况下,我们只是丢弃 的结果send)。或者我们可能需要稍后某个时间点的回复。最后,我们可能会将回复通过管道发送到另一个参与者的邮箱!

轻量级线程在这里给我们带来的好处是,我们可以.get()在需要结果时调用。但是对在后台运行的计算进行抽象仍然是有效的。

行为良好的Actor

为了实现一个actor,我们微库的用户需要提供它的行为:当消息到达时要做什么。这是需要实现的接口:

import java.util.concurrent.Future;

public interface ActorBehavior<MSG extends Message<?>> {
    Future<Reply> onMessage(ActorRef self, MSG message) 
        throws Exception;
}

与 不同ActorRef的是,您可以看到行为被限制为参与者处理的消息的子类型。参与者可以选择异步提供答案,例如,如果它依赖于来自另一个参与者的回复。如上所述,用户代码创建 a 的唯一方法Reply是向方法提供正确类型的值(由Message的泛型类型指定)Message.reply。

要创建一个Actor,我们有以下静态方法:

public static <MSG extends Message<?>> ActorRef create(
        ActorBehavior<MSG> behavior) {
    var queue = new LinkedBlockingQueue<PendingMessage<?>>();
    var self = new ActorRef(queue);
    var actor = new Actor(self, queue, behavior);
    Thread.startVirtualThread(actor);
    return self;
}

我们启动一个新的虚拟线程,它从创建的队列中消费消息并将关联的消息返回ActorRef给调用者。

最后,这是Actor的运行循环。我们消费队列;如果线程被中断,它会通知actor停止。否则,我们运行提供的行为:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Actor implements Runnable {
    private final ActorRef self;
    private final LinkedBlockingQueue<PendingMessage<?>> queue;
    private final ActorBehavior<Message<?>> behavior;

    Actor(ActorRef self, LinkedBlockingQueue<PendingMessage<?>> queue, 
            ActorBehavior<?> behavior) {
        this.self = self;
        this.queue = queue;
        this.behavior = (ActorBehavior<Message<?>>) behavior;
    }

    public void run() {
        var running = true;
        while (running) {
            PendingMessage<?> pending = null;
            try {
                pending = queue.poll(1000, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                running = false;
            }

            if (pending != null) {
                try {
                    var reply = behavior
                        .onMessage(self, pending.message());
                    handleReply(pending, reply);
                } catch (Exception e) {
                    logger.error("Exception when processing: " + 
                        pending.message(), e);
                    pending.future().completeExceptionally(e);
                }
            }
        }
    }

// …
}

我们从行为中得到的回复是Future<Reply>. 完成后,我们还需要完成待处理消息的未来。我们创建一个新的虚拟线程(毕竟它们很便宜!)来阻止Future<Reply>并进一步传递结果。如果你知道flatMap——这正是我们在这里所做的:

private void handleReply(PendingMessage<?> pending, Future<Reply> reply) {
    if (reply != null) {
        Thread.startVirtualThread(() -> {
            try {
                ((CompletableFuture<Object>) pending.future())
                    .complete(reply.get().getReply());
            } catch (Exception e) {
                pending.future().completeExceptionally(e);
            }
        });
    } else pending.future().complete(null);
}

我们完成了!在大约 100 行代码中,我们创建了一个极其简化但功能强大的 Actor 实现。

Actor在行动

例如,让我们编写一个简单的计数器actor,它接受三个消息:Get、Increase和Decrease。我们将这些建模为密封接口:

sealed interface CounterMessage<R> extends Message<R> {}
record Increase(int i) implements CounterMessage<Void> {}
record Decrease(int i) implements CounterMessage<Void> {}
record Get() implements CounterMessage<Integer> {}

接下来,我们将指定actor的行为:

class CounterActorBehavior implements ActorBehavior<CounterMessage<?>> {
    int counter = 0;

    @Override
    public Future<Reply> onMessage(ActorRef self, 
            CounterMessage<?> message) {
        Reply reply = null;
        switch (message) {
            case Increase(int i) inc -> {
                System.out.println("Increase message, by: " + i);
                counter += i;
                reply = inc.reply(null);
            }
            case Decrease(int i) dec -> {
                System.out.println("Decrease message, by: " + i);
                counter -= i;
                reply = dec.reply(null);
            }
            case Get() get -> {
                System.out.println("Get message, current state: " + 
                    counter);
                reply = get.reply(counter);
            }
        }
        return CompletableFuture.completedFuture(reply);
    }

Actor 包含在可变状态 ( counter) 上,访问受到保护。保证此状态只能由单个线程访问。

的合约onMessage迫使我们返回某种回复——在这里,我们总是同步计算它。通过构造该Message.reply方法,回复是类型安全的。

最后,我们将所有内容联系在一起并进行计数:

public static void main(String args) throws Exception {
    var actor = Actor.create(new CounterActorBehavior());

    actor.send(new Increase(10));
    actor.send(new Decrease(8));
    var result = actor.send(new Get()).get();
    System.out.println("Got result: " + result);
}

我们只等待最终回复——因为我们使用 FIFO 队列,所以Get消息应该在Increase和之后处理Decrease。因此,在运行代码时,您应该会看到2.

试试看

可在 GitHub 上找到
除了反例之外,还有一个更大的例子,它为有限数量的工人实现了代理参与者(经理)。代理将等待消息与免费作品配对,因为它们变得可用。一切都以非阻塞方式发生。

陷阱

JVM 上的参与者有其陷阱。有些是 Loom 实现所独有的;由于平台和类型系统的通用性,有些是固有的。

首先,我们在actor内部进行阻塞时必须小心。使用虚拟线程,这可能比以前更成问题。请记住,如果您在 actor 的行为中调用阻塞操作,这将阻止任何消息被进一步处理。如果参与者实现的业务流程需要停止处理其他参与者的消息,这可能是可取的。但大多数情况下,您会希望异步运行阻塞操作(例如在新的虚拟线程中)并将结果作为消息返回给参与者。为此,参与者可以访问self- ActorRef。但是,我们的实现缺少 API 来优雅地表达上述内容。但是,这只是添加几个实用方法的问题。

其次(这个 hazard 与 Akka 共享),我们必须小心不要泄露 actor 的可变状态。如果参与者的内部状态是,例如,一个可变集合,在将集合发送到外部之前,我们应该总是制作一个副本。此外,我们必须小心不要从任何类型的回调或闭包中访问(读取或写入!)actor 的状态——因为这些可能是从其他线程、由其他参与者或在future完成时运行的。这可能会导致对状态的并发访问——这是我们一开始就想避免的!

未来

Actor 会成为我们在后 Loom Java 中进行并发的方式吗?还是我们会选择其他抽象,比如通道?社区会出现什么样的图书馆?

这些问题目前尚无答案,但如果您发现自己想做一些共享内存并发,可能还有其他选择。您的用例很可能需要共享内存方法,但也可能是消息传递提供了一种更简单且不易出错的解决方案。

请记住,现有的 Actor 实现使用 Java 19 可以继续正常工作。但是,创建一个非常基本的 Actor 实现也是一种选择。