向您展示一些可以非常方便地使用Java Stream流的场景以及如何使用它们的示例。 本文基于标准Java库java.util.stream。它既与反应流无关,也与诸如Vavr之类的其他流实现无关。另外,我将不介绍诸如并行执行之类的流的高级细节。 首先,让我们简要讨论与集合相比独特的流功能。尽管存在一些相似之处,但差异是很大的,您不应将流仅视为库中的另一种集合。 根据java.util.stream 的文档,最重要的功能是:
- 没有存储空间,可能是无限制的 -集合是现成的数据结构,而流表示产生数据的能力,通常在创建流时甚至不存在。由于不存储流中的数据,因此我们可以创建几乎不确定的流,或者可以更实际地对其重新措辞,我们可以让消费者决定要从流中读取多少个元素,从生产者的角度来看,它可能是不确定的(例如new Random().ints())。
- 懒惰加载 —在定义流时暂停许多操作(例如过滤,映射),并且仅在使用者决定使用流中的数据时才执行
- 本质上是实用的 -由于您已经具有使用流的经验,因此您可能会注意到处理流中的数据是为每个步骤(例如过滤器或映射)创建新流,而不是修改源数据
- 消耗性 -您只能读取一次流,然后与可以多次读取的集合不同,它变为“消耗性”
处理大量数据 假设,我们必须将数据从外部服务复制到我们的数据库中。要复制的数据量可以任意大。我们无法获取所有数据,无法将其存储在一个集合中,然后保存在数据库中,因为这可能会耗尽堆内存。我们必须分批处理数据,并设计外部服务客户端和数据库存储之间的接口。由于流不存储日期,因此可以使用它安全地处理所需的数据量。 在示例(及以下所有示例)中,我们将使用java.util.stream.Stream接口的静态方法来构建流。用Java构建流的最强大,最灵活的方法是实现Spliterator接口,然后使用StreamSupport类将其包装为流。但是,正如我们所看到的,Stream在许多情况下,接口中的静态工厂方法就足够了。 假定一个简单的API从支持分页的外部服务(例如,REST服务,数据库)中获取数据。该API最多可limit从提取项目offset。迭代地使用API,我们可以根据需要获取尽可能多的数据。
interface ExternalService { List<String> fetch(int offset, int limit); } |
现在,我们可以使用API提供数据流,并将API的使用者与分页API隔离开:
class Service<T> { private final ExternalService<T> externalService; public Stream<T> stream(int size, int batchSize) { var cursor = new Cursor(); return Stream .generate(() -> next(cursor, size, batchSize)) .takeWhile(not(List::isEmpty)) .flatMap(List::stream); } private List<T> next(Cursor cursor, int size, int batchSize) { var fetchSize = Math.min(size - cursor.offset, batchSize); var result = externalService.fetch(cursor.offset, fetchSize); cursor.inc(result.size()); return result; } } |
private static class Cursor { private int offset; void inc(int by) { offset += by; } } |
我们使用Stream.generate()方法构建无限流,其中每个元素由流提供者创建。流元素是从REST API获取的页面List
快速访问(不完整)数据 假设我们有一个耗时的操作,必须对数据的每个元素执行该操作,并且计算要花费时间t。对于n元素,使用者必须等待t * n才能接收到计算结果。例如,如果用户正在等待带有计算结果的表,则可能是一个问题。我们希望在显示第一结果时立即显示它们,而不是等待所有结果的计算并立即提交表。
public class Producer1 { private Stream<String> buildStream() { return Stream.of("a", "b", "c"); } private String expensiveStringDoubler(String input) { return input + input; } public Stream<String> stream() { return buildStream().map(this::expensiveComputation); } } |
消费者:
stream().forEach(System.out::println) |
输出:
Processing of: a aa Processing of: b … |
输出:
Processing of: a aa Processing of: b … |
如我们所见,在开始处理下一个元素之前,用户可以使用第一个元素“ aa ”的处理结果,但是计算仍然是流的生产者责任。换句话说,消费者决定何时以及是否应该执行计算,但是生产者仍然负责如何执行计算。 您可能会认为这很容易,并且不需要流。当然,您是对的,让我们看一下:
public class Producer1Classic { public List<String> data() { return List.of("a", "b", "c", "d", "e", "f"); } public String expensiveStringDoubler(String input) { return input + input; } } |
消费者:
var producer = new Producer1Classic(); for (String element : producer.data()) { System.out.println(producer.expensiveComputation(element)); } |
同样的效果,但是实际上我们已经重新发明了轮子,我们的实现模仿了stream的祖先- Iterator并且我们失去了stream的API的优势。
避免过早计算 再次假设我们要对每个流元素执行耗时的操作。在某些情况下,API的使用者无法提前说出需要多少数据。例如:
- 用户取消了数据加载
- 在数据处理过程中发生错误,无需处理其余数据
- 消费者读取数据直到满足条件,例如第一个正值
private Stream<Double> buildStream() { return new Random().doubles().boxed(); } private Double expensiveComputation(Double input) { return input / 2; } public Stream<Double> stream() { return buildStream().map(this::expensiveComputation); } |
消费者:
stream().peek(System.out::println).filter(value -> value > 0.4).findFirst(); |
在该示例中,使用者读取数据,直到该值大于0.4。生产者并不了解消费者的这种逻辑,但它只计算必要的项目。逻辑(例如条件)可以在用户端独立更改。
API易于使用 使用流而不是自定义API设计还有另一个原因。流是标准库的一部分,并为许多开发人员所熟知。在我们的API中使用流使其他开发人员更容易使用该API。
其他注意事项 错误处理 传统的错误处理不适用于Streams。由于实际处理将推迟到需要时进行,因此构造流时不会引发异常。基本上,我们有两个选择:
- 引发RuntimeException-终止方法(例如forEach)将引发异常
- 将元素包装到一个对象中,该对象表示正在处理的元素的当前状态,例如TryVavr库中的特殊类(博客中的详细信息)
private Stream<Double> buildStream() { return new Random().doubles().boxed(); } private Double expensiveComputation(Double input) { if (input > 0.8) throw new RuntimeException("Data processing exception"); return input / 2; } public Stream<Double> stream() { return buildStream().map(this::expensiveComputation).onClose(()-> System.out.println("Releasing resources…")); } |
消费者:
try (Stream<Double> stream = stream()){ stream.forEach(System.out::println); } |
输出:
0.2264004802916616 0.32777949557515484 Releasing resources… Exception in thread “main” java.lang.RuntimeException: Data processing exception |
在该示例中,当发生数据处理异常时,流将通过try-with-resources语句自动关闭,并调用已注册的处理程序。在示例输出中,我们可以看到Releasing resources…处理程序打印的消息。
总结
- 流不是集合。
- 流可以帮助我们解决以下问题:*处理大量数据*快速访问(不完整的)数据*避免过早计算
- 构建流并不难。
- 我们必须注意错误处理。
- 支持资源管理。