conveyor:反应式流处理的Java库


数据处理可以分为多个同时进行。这有利于 IO 密集型工作的高效流水线化,与对java.util.stream等CPU 密集型工作的关注相辅相成。

Conveyor 在很大程度上受到 Akka Streams 以及其他响应式 API(如 RxJava 和 RxJS)以及相关 API(如 Kotlin Flows、Go Channels 和 Java 自己的 BlockingQueues)的影响。

Conveyor 与 Reactive Streams 的许多理念相同,但它是针对线程众多、阻塞成本低廉的新世界而重新设计的。这种转变使得基于阻塞推拉的接口更易于理解、实现和调试,因为调用栈与您的实现和使用密切相关。

该库尤其依赖于 Java 21 的预览功能StructuredTaskScope。

StepSource是我们可以从中获取Pull元素的

Iterator<Integer> iter = List.of(1, 2, 3).iterator();
Belt.StepSource<Integer> source = () -> iter.hasNext() ? iter.next() : null;
source.pull(); // 1
source.pull();
// 2
source.pull();
// 3
source.pull();
// null

StepSink是我们可以Push元素的

Belt.StepSink<Integer> sink = element -> { System.out.println(element); return true; };
sink.push(1); // true
sink.push(2);
// true
sink.push(3);
// true

可以将StepSource连接到  StepSink,这将返回 Station。

Belt.StepSource<Integer> source = Belts.iteratorSource(List.of(1, 2, 3).iterator());
Belt.StepSink<Integer> sink = element -> { System.out.println(element); return true; };
Belt.Station station = source.andThen(sink); // Alternatively: sink.compose(source);

Station做一件事run,它会l从StepSource pull数据push到StepSink:

station.run(executor);
// Prints:
// 1
// 2
// 3

通过向执行器提交任务来异步完成其工作。
通常,在这里使用每个任务线程执行器是一个好主意,以避免当我们开始将站链接在一起时出现死锁。每个站都将在自己的线程中运行。

Executor executor = runnable -> scope.fork(Executors.callable(runnable, null));
// Or, equivalently:
Executor executor = Belts.scopeExecutor(scope);

这种方法的另一个好处是scope.join()提供了一种等待所有站点完成的方法。