向您展示一些可以非常方便地使用Java Stream流的场景以及如何使用它们的示例。
本文基于标准Java库java.util.stream。它既与反应流无关,也与诸如Vavr之类的其他流实现无关。另外,我将不介绍诸如并行执行之类的流的高级细节。
首先,让我们简要讨论与集合相比独特的流功能。尽管存在一些相似之处,但差异是很大的,您不应将流仅视为库中的另一种集合。
根据java.util.stream 的文档,最重要的功能是:
- 没有存储空间,可能是无限制的 -集合是现成的数据结构,而流表示产生数据的能力,通常在创建流时甚至不存在。由于不存储流中的数据,因此我们可以创建几乎不确定的流,或者可以更实际地对其重新措辞,我们可以让消费者决定要从流中读取多少个元素,从生产者的角度来看,它可能是不确定的(例如new Random().ints())。
- 懒惰加载 —在定义流时暂停许多操作(例如过滤,映射),并且仅在使用者决定使用流中的数据时才执行
- 本质上是实用的 -由于您已经具有使用流的经验,因此您可能会注意到处理流中的数据是为每个步骤(例如过滤器或映射)创建新流,而不是修改源数据
- 消耗性 -您只能读取一次流,然后与可以多次读取的集合不同,它变为“消耗性”
现在让我们看看我们可以用流解决什么问题。
处理大量数据
假设,我们必须将数据从外部服务复制到我们的数据库中。要复制的数据量可以任意大。我们无法获取所有数据,无法将其存储在一个集合中,然后保存在数据库中,因为这可能会耗尽堆内存。我们必须分批处理数据,并设计外部服务客户端和数据库存储之间的接口。由于流不存储日期,因此可以使用它安全地处理所需的数据量。
在示例(及以下所有示例)中,我们将使用java.util.stream.Stream接口的静态方法来构建流。用Java构建流的最强大,最灵活的方法是实现Spliterator接口,然后使用StreamSupport类将其包装为流。但是,正如我们所看到的,Stream在许多情况下,接口中的静态工厂方法就足够了。
假定一个简单的API从支持分页的外部服务(例如,REST服务,数据库)中获取数据。该API最多可limit从提取项目offset。迭代地使用API,我们可以根据需要获取尽可能多的数据。
interface ExternalService { |
现在,我们可以使用API提供数据流,并将API的使用者与分页API隔离开:
class Service<T> { |
Cursor 握有当前偏移量offset:
private static class Cursor { |
我们使用Stream.generate()方法构建无限流,其中每个元素由流提供者创建。流元素是从REST API获取的页面List<T>。将为每个流创建Cursor类的实例,以跟踪获取的元素的进度。
Stream.takeWhile()方法用于检测的最后一页,最后返回的数据流T,而不是List<T>。
我们使用flatMap扁平化流。尽管在某些情况下,保留批处理(例如将整个页面保存在一个事务中)可能很有用。
现在,我们可以使用Service.stream(size, batchSize)来检索任意长流,而无需任何分页API的知识(我们决定公开batchSize参数,但这是一个设计决策)。在任何时间点,内存消耗都受到批处理大小的限制。使用者可以一一处理数据,将其保存在数据库中,或者再次进行批处理(批处理大小可能不同)。
快速访问(不完整)数据
假设我们有一个耗时的操作,必须对数据的每个元素执行该操作,并且计算要花费时间t。对于n元素,使用者必须等待t * n才能接收到计算结果。例如,如果用户正在等待带有计算结果的表,则可能是一个问题。我们希望在显示第一结果时立即显示它们,而不是等待所有结果的计算并立即提交表。
public class Producer1 { |
消费者:
stream().forEach(System.out::println) |
输出:
Processing of: a |
输出:
Processing of: a |
如我们所见,在开始处理下一个元素之前,用户可以使用第一个元素“ aa ”的处理结果,但是计算仍然是流的生产者责任。换句话说,消费者决定何时以及是否应该执行计算,但是生产者仍然负责如何执行计算。
您可能会认为这很容易,并且不需要流。当然,您是对的,让我们看一下:
public class Producer1Classic { |
消费者:
var producer = new Producer1Classic(); |
同样的效果,但是实际上我们已经重新发明了轮子,我们的实现模仿了stream的祖先- Iterator并且我们失去了stream的API的优势。
避免过早计算
再次假设我们要对每个流元素执行耗时的操作。在某些情况下,API的使用者无法提前说出需要多少数据。例如:
- 用户取消了数据加载
- 在数据处理过程中发生错误,无需处理其余数据
- 消费者读取数据直到满足条件,例如第一个正值
由于流的惰性,在这种情况下可以避免一些计算。
private Stream<Double> buildStream() { |
消费者:
stream().peek(System.out::println).filter(value -> value > 0.4).findFirst(); |
在该示例中,使用者读取数据,直到该值大于0.4。生产者并不了解消费者的这种逻辑,但它只计算必要的项目。逻辑(例如条件)可以在用户端独立更改。
API易于使用
使用流而不是自定义API设计还有另一个原因。流是标准库的一部分,并为许多开发人员所熟知。在我们的API中使用流使其他开发人员更容易使用该API。
其他注意事项
错误处理
传统的错误处理不适用于Streams。由于实际处理将推迟到需要时进行,因此构造流时不会引发异常。基本上,我们有两个选择:
- 引发RuntimeException-终止方法(例如forEach)将引发异常
- 将元素包装到一个对象中,该对象表示正在处理的元素的当前状态,例如TryVavr库中的特殊类(博客中的详细信息)
资源管理
有时我们必须使用一种资源来提供流数据(例如,外部服务中的会话),并且我们想在流处理完成时将其释放。幸运的是,流实现了Autoclosable接口,我们可以在try-with-resources语句中使用流,从而使资源管理变得非常容易。我们要做的就是使用onClose方法在流中注册一个钩子。当流关闭时,该挂钩将自动被调用。
private Stream<Double> buildStream() { |
消费者:
try (Stream<Double> stream = stream()){ |
输出:
0.2264004802916616 |
在该示例中,当发生数据处理异常时,流将通过try-with-resources语句自动关闭,并调用已注册的处理程序。在示例输出中,我们可以看到Releasing resources…处理程序打印的消息。
总结
- 流不是集合。
- 流可以帮助我们解决以下问题:*处理大量数据*快速访问(不完整的)数据*避免过早计算
- 构建流并不难。
- 我们必须注意错误处理。
- 支持资源管理。