Java 中的快速且可扩展的通道。设计用于与Project Loom一起使用。
受到“Kotlin 协程中的快速且可扩展的通道”论文和Kotlin 实现的启发。
jox库在 Java 中实现了一种高效的Channel数据结构,该结构旨在与虚拟线程(在 Java 21 中引入)一起使用。具体来说,通道依赖阻塞操作来执行相关方之间必要的同步。
该实现遵循“Kotlin Coroutines 中的快速且可扩展的通道”论文以及Kotlin 实现。其中使用了协程和可挂起函数(这是 Kotlin 特定的功能)。协程的基本原理和用例与设计虚拟线程的基本原理和用例类似。因此,该算法可能在使用两个运行时时都能很好地工作!
与队列类似,数据可以发送到通道并从通道接收。此外,通道可能会被关闭,因为源已完成发送值,或者因为接收器处理接收到的值时出现错误。
简短演示;我们发送两个值,然后通过声明不会产生更多值来关闭通道。接收方与发送方同步工作,接收后续值,最后接收一个表示通道已完成的值:
<dependency> <groupId>com.softwaremill.jox</groupId> <artifactId>core</artifactId> <version>0.0.2</version> </dependency>
|
import jox.Channel;
class Demo { public static void main(String[] args) throws InterruptedException { // creates a rendezvous channel (buffer of size 0—a sender & // receiver must meet to pass a value) var ch = new Channel<Integer>();
Thread.ofVirtual().start(() -> { try { // send() will block, until there's a matching receive() ch.send(1); System.out.println("Sent 1"); ch.send(2); System.out.println("Sent 2"); ch.done(); System.out.println("Done"); } catch (InterruptedException e) { throw new RuntimeException(e); } });
System.out.println("Received: " + ch.receive()); System.out.println("Received: " + ch.receive()); // we're using the "safe" variant which returns a value when the // channel is closed, instead of throwing a ChannelDoneException System.out.println("Received: " + ch.receiveSafe()); } }
|
上例使用的是没有缓冲区的会合通道,发送方和接收方必须同步才能发送数值。缓冲通道也可以通过提供一个构造函数参数(例如 new Channel<String>(10))来创建。
为了密切关注性能,jox 包含一套jmh基准测试,在每次提交/PR 合并后运行,从而可以快速发现任何性能回归。
jox 的集合通道与 Kotlin 的集合通道(在 M1 Max MacBook Pro 上完成):
- 在 Kotlin 中,单个发送/接收操作大约需要 108 纳秒,
- 而在 jox 中,需要 176 纳秒。
误差:
- Kotlin 的误差为 ±0.5 ns/op,
- 而 jox 的误差为 ±14.9 ns/op。
这表明 jox 的性能在运行之间差异很大。