jox:Java 中的快速且可扩展的通道


Java 中的快速且可扩展的通道。设计用于与Project Loom一起使用。

受到“Kotlin 协程中的快速且可扩展的通道”论文Kotlin 实现的启发。

jox库在 Java 中实现了一种高效的Channel数据结构,该结构旨在与虚拟线程(在 Java 21 中引入)一起使用。具体来说,通道依赖阻塞操作来执行相关方之间必要的同步。

该实现遵循“Kotlin Coroutines 中的快速且可扩展的通道”论文以及Kotlin 实现。其中使用了协程和可挂起函数(这是 Kotlin 特定的功能)。协程的基本原理和用例与设计虚拟线程的基本原理和用例类似。因此,该算法可能在使用两个运行时时都能很好地工作!

与队列类似,数据可以发送到通道并从通道接收。此外,通道可能会被关闭,因为源已完成发送值,或者因为接收器处理接收到的值时出现错误。

简短演示;我们发送两个值,然后通过声明不会产生更多值来关闭通道。接收方与发送方同步工作,接收后续值,最后接收一个表示通道已完成的值:

<dependency>
    <groupId>com.softwaremill.jox</groupId>
    <artifactId>core</artifactId>
    <version>0.0.2</version>
</dependency>


import jox.Channel;

class Demo {
    public static void main(String[] args) throws InterruptedException {
        // creates a rendezvous channel (buffer of size 0—a sender & 
       
// receiver must meet to pass a value)
        var ch = new Channel<Integer>();

        Thread.ofVirtual().start(() -> {
            try {
               
// send() will block, until there's a matching receive()
                ch.send(1);
                System.out.println(
"Sent 1");
                ch.send(2);
                System.out.println(
"Sent 2");
                ch.done();
                System.out.println(
"Done");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        System.out.println(
"Received: " + ch.receive());
        System.out.println(
"Received: " + ch.receive());
       
// we're using the "safe" variant which returns a value when the
       
// channel is closed, instead of throwing a ChannelDoneException
        System.out.println(
"Received: " + ch.receiveSafe());
    }
}

上例使用的是没有缓冲区的会合通道,发送方和接收方必须同步才能发送数值。缓冲通道也可以通过提供一个构造函数参数(例如 new Channel<String>(10))来创建。

为了密切关注性能,jox 包含一套jmh基准测试,在每次提交/PR 合并后运行,从而可以快速发现任何性能回归。

jox 的集合通道与 Kotlin 的集合通道(在 M1 Max MacBook Pro 上完成):

  • 在 Kotlin 中,单个发送/接收操作大约需要 108 纳秒,
  • 而在 jox 中,需要 176 纳秒。

误差:
  • Kotlin 的误差为 ±0.5 ns/op,
  • 而 jox 的误差为 ±14.9 ns/op。

这表明 jox 的性能在运行之间差异很大。