数据处理可以分为多个同时进行。这有利于 IO 密集型工作的高效流水线化,与对java.util.stream等CPU 密集型工作的关注相辅相成。
Conveyor 在很大程度上受到 Akka Streams 以及其他响应式 API(如 RxJava 和 RxJS)以及相关 API(如 Kotlin Flows、Go Channels 和 Java 自己的 BlockingQueues)的影响。
Conveyor 与 Reactive Streams 的许多理念相同,但它是针对线程众多、阻塞成本低廉的新世界而重新设计的。这种转变使得基于阻塞推拉的接口更易于理解、实现和调试,因为调用栈与您的实现和使用密切相关。
该库尤其依赖于 Java 21 的预览功能StructuredTaskScope。
StepSource是我们可以从中获取Pull元素的
Iterator<Integer> iter = List.of(1, 2, 3).iterator(); |
StepSink是我们可以Push元素的
Belt.StepSink<Integer> sink = element -> { System.out.println(element); return true; }; |
可以将StepSource连接到 StepSink,这将返回 Station。
Belt.StepSource<Integer> source = Belts.iteratorSource(List.of(1, 2, 3).iterator()); |
Station做一件事run,它会l从StepSource pull数据push到StepSink:
station.run(executor); |
通过向执行器提交任务来异步完成其工作。
通常,在这里使用每个任务线程执行器是一个好主意,以避免当我们开始将站链接在一起时出现死锁。每个站都将在自己的线程中运行。
Executor executor = runnable -> scope.fork(Executors.callable(runnable, null)); |
这种方法的另一个好处是scope.join()提供了一种等待所有站点完成的方法。