Jox Flows:Java结构化+虚拟线程处理速度翻倍


我们很高兴地宣布,Jox库现在有了一个名为Flows的新特性:带来了Reactive Streams丰富的并发特性,同时保持了同步Java编程的简单性!

受Scala对等库Ox科特林实现的启发,Flows为利用Java Structured Concurrency和Virtual Threads的并发环境中的数据处理提供了简单且用户友好的API。

在本文中,我将逐步介绍Flows及其可能性,从简单的同步处理开始,以更高级的异步计算结束。


什么是Flow?
单个Flow表示可以由定义的管道同步或异步处理的值流。只有在调用任何一个Flow方法时,才开始处理run*,这使得它们变得“懒惰”。

Flows非常类似于反应流,但它们的主要目标是提供一个用于定义同步和异步数据处理管道的高级API。尽管存在概念上的差异,但Flows可以被视为反应流。它们可以转换为规范和TCK兼容的Publisher,或者从规范和TCK兼容的Publisher创建。

流是在Jox的ChannelsJox的Structured Concurrency之上创建的。如果你想了解更多关于Structured Concurrency的信息,你可以看看这篇文章。

如何从Flows开始?
添加依赖项

<dependency>
    <groupId>com.softwaremill.jox</groupId>
    <artifactId>flows</artifactId>
    <version>0.4.0</version>
</dependency>



"流水线Flow编程"入门

现在咱们把工具都准备好了,来看看这个叫"Flows"的玩意儿能干啥。它就像个万能工厂,能生产各种数字流水线!

1. 基础流水线
比如这个range流水线,就像个自动数数机:

java
Flows.range(1, 4, 1)  // 从1数到4,每次+1
     .runForeach(System.out::println); // 把每个数打印出来
输出:

1
2
3
4

⚠️ 注意:这流水线是"冷"的!就像冰箱里的可乐——你不打开喝(调用run方法),它就不会动。

2. 重复使用流水线
有些流水线可以反复用,比如从列表生成的:

java
Flow flow = Flows.fromIterable(List.of(1, 2, 3));
flow.runForeach(System.out::println); // 第一次打印
flow.runForeach(System.out::println); // 再打印一次
但注意!有些流水线像一次性筷子,用完就没了。

3. 无限流水线
还能造永动机!比如这个每100ms喊口号的:

java
Flows.tick(Duration.ofMillis(100), "SML Rules!")
     .runForeach(System.out::println);
输出:

SML Rules!
SML Rules!
...(一直喊下去)
可以用take(n)让它只喊n次。

4. 自定义流水线
自己造个流水线也很简单:

java
Flows.usingEmit(emit -> {
    emit.apply(1); // 发射数字1
    emit.apply(2); // 发射数字2
}).runForeach(System.out::println);

5. 加工数据
像工厂流水线一样加工数据:
- 放大10倍
 

java
  Flows.range(1,4,1).map(i -> i*10)... // 输出10,20,30,40
 
- 过滤
 
java
  .filter(i -> i < 30) // 只留小于30的数
 

6. 多线程加速
最酷的是可以开多个"工人"并行干活:

java
.mapPar(5, i -> i*2) // 用5个线程同时计算
虽然计算顺序可能乱,但最后结果顺序不变!

7. 分组处理
像把水果分篮处理:
- 奇数放A篮,偶数放B篮
- 两篮分别用不同工人处理

8. 异常处理
流水线出错时会层层上报,记得用try-catch接住哦!

总结
这个Flows就像乐高积木:
- 可以拼出各种数据处理流水线
- 能单线程也能多线程
- 适合处理大量数据或需要并发的场景