Java 19 包含Project Loom核心组件的预览:虚拟线程。我们现在可以随意创建线程,因为我们不再(或至少少得多)受它们在内存和上下文切换时间方面的成本的限制。
哪里有线程,哪里就有并发。那么这对于 JVM 上的并发性意味着什么呢?可以肯定的是,阻塞 API 将卷土重来,取代异步、基于 Future 或反应式的代码。但是到什么程度呢?这还有待观察。
但是,无论我们编写代码的方式如何(阻塞线程、使用Futures 或响应式扩展),作为一个行业,我们在处理并发时已经制定了一些最佳实践。在高层次上,这样的一种做法是避免使用共享内存协调并发运行的进程。这也意味着避免使用锁来同步对该内存的访问。
共享内存和锁的主要问题是死锁的可能性(如果锁并不总是以相同的顺序获取)。这种方法也可能导致高争用和阻塞线程:即使线程是虚拟的,它们也可能实现重要的业务功能,除非绝对必要,否则不应阻塞。最后,我们可能会遇到数据竞争,如果不是所有必需的锁都被获取或持有的时间不够长。
我们可以将设计基于消息传递,而不是共享内存和锁。这是Actor模型的基石,在Erlang和Akka中实现,但它也是 Go 的goroutines的基础。
在最初的草稿中,Project Loom 还包括一个类似 Go 的通道实现;但是,它不包含在预览中。不过,我们可能会怀疑,通道将在稍后的某个时间点添加到 Java 的 std 库中。
Actor
好消息是,使用 Loom 实现一个Actor,或者至少是一个类似Actor的抽象,变得更加容易。但首先,什么是Actor?
Actor 包含了一些(可能是可变的)状态,在按顺序处理来自其邮箱的传入消息时使用该状态。发送消息(并可选择接收回复)是与参与者交互的唯一选项。消息是逐个串行处理的,因此无法同时访问actor的状态。
将其转换为更专业的术语,参与者运行一个无限循环,该循环使用并处理来自队列的消息。以前,实现这一点需要编写一个自定义调度程序,它将许多参与者多路复用到一个有界线程池中。使用 Java 19,我们可以为每个参与者创建一个虚拟线程并依赖 JVM 的调度。
让我们看看如何使用 Loom 实现一个基本的 Actor 实现。当然,它远不及像 Akka 这样的生产级 Actor 实现,后者包括监督、错误处理、远程处理、位置透明性等功能。尽管如此,如果你有一些可变状态,它的访问应该受到保护和序列化,它可能会起作用。
使用 Loom 实现
在我们的实现中,actor 将是一对:一个队列(我们将使用 unbounded LinkedBlockingQueue)和一个从该队列消费的虚拟线程。我们至少需要两个类:Actor实现运行循环,以及ActorRef(遵循 Akka 的术语),它允许向参与者发送消息。它Actor是单线程的,ActorRef可以在多个线程之间自由共享。
我们将为所有消息定义一个基本接口,要求它们指定发送者在响应正在处理的消息时可能期望的回复类型。这与 Actor 通常的实现方式不同——使用单向开火并忘记。然而,在我们的Actor微库的简单用例中,访问回复可能很有用:
public interface Message<REPLY> { |
课程更多的Reply是技术性。它旨在包装对消息的回复,以便发回。它只能使用 来创建Message.reply,它采用正确类型的参数(注意包私有构造函数)。如果我们希望将参与者的行为表示为简单的模式匹配(参见下面的示例),我们无法使用泛型来表达这一点,因为 Java 中没有流类型。
该ActorRef.send方法接受一条消息并将其放在参与者的队列中:
import java.util.concurrent.CompletableFuture; |
队列包含原始消息和记录CompletableFuture中捕获的用于回复消息的PendingMessage。
每个参与者仅支持特定的Message实现子集。理想情况下,我们希望将其添加为ActorRef使用泛型参数的约束。尽管如此,由于 Java 有限的泛型(或者我有限的 Java API 建模知识),这似乎是不可能的。因此,该设计并不像我们理想的那样是类型安全的。
future!?
等等,你说——Future在那里做什么?我们不是已经完成了这个包装废话吗?我们不是要到处使用阻塞代码,因为我们有轻量级线程吗?
不总是——Future仍然是一个方便的抽象!这里它捕获了一个重要属性:消息是异步处理的。这是我们有意识的决定,可能对我们的业务流程至关重要。我们可能对回复根本不感兴趣(在这种情况下,我们只是丢弃 的结果send)。或者我们可能需要稍后某个时间点的回复。最后,我们可能会将回复通过管道发送到另一个参与者的邮箱!
轻量级线程在这里给我们带来的好处是,我们可以.get()在需要结果时调用。但是对在后台运行的计算进行抽象仍然是有效的。
行为良好的Actor
为了实现一个actor,我们微库的用户需要提供它的行为:当消息到达时要做什么。这是需要实现的接口:
import java.util.concurrent.Future; |
与 不同ActorRef的是,您可以看到行为被限制为参与者处理的消息的子类型。参与者可以选择异步提供答案,例如,如果它依赖于来自另一个参与者的回复。如上所述,用户代码创建 a 的唯一方法Reply是向方法提供正确类型的值(由Message的泛型类型指定)Message.reply。
要创建一个Actor,我们有以下静态方法:
public static <MSG extends Message<?>> ActorRef create( |
我们启动一个新的虚拟线程,它从创建的队列中消费消息并将关联的消息返回ActorRef给调用者。
最后,这是Actor的运行循环。我们消费队列;如果线程被中断,它会通知actor停止。否则,我们运行提供的行为:
import java.util.concurrent.CompletableFuture; |
我们从行为中得到的回复是Future<Reply>. 完成后,我们还需要完成待处理消息的未来。我们创建一个新的虚拟线程(毕竟它们很便宜!)来阻止Future<Reply>并进一步传递结果。如果你知道flatMap——这正是我们在这里所做的:
private void handleReply(PendingMessage<?> pending, Future<Reply> reply) { |
我们完成了!在大约 100 行代码中,我们创建了一个极其简化但功能强大的 Actor 实现。
Actor在行动
例如,让我们编写一个简单的计数器actor,它接受三个消息:Get、Increase和Decrease。我们将这些建模为密封接口:
sealed interface CounterMessage<R> extends Message<R> {} |
接下来,我们将指定actor的行为:
class CounterActorBehavior implements ActorBehavior<CounterMessage<?>> { |
Actor 包含在可变状态 ( counter) 上,访问受到保护。保证此状态只能由单个线程访问。
的合约onMessage迫使我们返回某种回复——在这里,我们总是同步计算它。通过构造该Message.reply方法,回复是类型安全的。
最后,我们将所有内容联系在一起并进行计数:
public static void main(String args) throws Exception { |
我们只等待最终回复——因为我们使用 FIFO 队列,所以Get消息应该在Increase和之后处理Decrease。因此,在运行代码时,您应该会看到2.
试试看
可在 GitHub 上找到。
除了反例之外,还有一个更大的例子,它为有限数量的工人实现了代理参与者(经理)。代理将等待消息与免费作品配对,因为它们变得可用。一切都以非阻塞方式发生。
陷阱
JVM 上的参与者有其陷阱。有些是 Loom 实现所独有的;由于平台和类型系统的通用性,有些是固有的。
首先,我们在actor内部进行阻塞时必须小心。使用虚拟线程,这可能比以前更成问题。请记住,如果您在 actor 的行为中调用阻塞操作,这将阻止任何消息被进一步处理。如果参与者实现的业务流程需要停止处理其他参与者的消息,这可能是可取的。但大多数情况下,您会希望异步运行阻塞操作(例如在新的虚拟线程中)并将结果作为消息返回给参与者。为此,参与者可以访问self- ActorRef。但是,我们的实现缺少 API 来优雅地表达上述内容。但是,这只是添加几个实用方法的问题。
其次(这个 hazard 与 Akka 共享),我们必须小心不要泄露 actor 的可变状态。如果参与者的内部状态是,例如,一个可变集合,在将集合发送到外部之前,我们应该总是制作一个副本。此外,我们必须小心不要从任何类型的回调或闭包中访问(读取或写入!)actor 的状态——因为这些可能是从其他线程、由其他参与者或在future完成时运行的。这可能会导致对状态的并发访问——这是我们一开始就想避免的!
未来
Actor 会成为我们在后 Loom Java 中进行并发的方式吗?还是我们会选择其他抽象,比如通道?社区会出现什么样的图书馆?
这些问题目前尚无答案,但如果您发现自己想做一些共享内存并发,可能还有其他选择。您的用例很可能需要共享内存方法,但也可能是消息传递提供了一种更简单且不易出错的解决方案。
请记住,现有的 Actor 实现使用 Java 19 可以继续正常工作。但是,创建一个非常基本的 Actor 实现也是一种选择。