Java Stream和Collection比较:何时以及如何从Java API返回Stream而不是集合Collection? - TomaszKiełbowicz

20-02-12 banq

向您展示一些可以非常方便地使用Java Stream流的场景以及如何使用它们的示例。

本文基于标准Java库java.util.stream。它既与反应流无关,也与诸如Vavr之类的其他流实现无关。另外,我将不介绍诸如并行执行之类的流的高级细节。

首先,让我们简要讨论与集合相比独特的流功能。尽管存在一些相似之处,但差异是很大的,您不应将流仅视为库中的另一种集合。

根据java.util.stream 的文档,最重要的功能是:

  • 没有存储空间,可能是无限制的 -集合是现成的数据结构,而流表示产生数据的能力,通常在创建流时甚至不存在。由于不存储流中的数据,因此我们可以创建几乎不确定的流,或者可以更实际地对其重新措辞,我们可以让消费者决定要从流中读取多少个元素,从生产者的角度来看,它可能是不确定的(例如new Random().ints())。
  • 懒惰加载 —在定义流时暂停许多操作(例如过滤,映射),并且仅在使用者决定使用流中的数据时才执行
  • 本质上是实用的 -由于您已经具有使用流的经验,因此您可能会注意到处理流中的数据是为每个步骤(例如过滤器或映射)创建新流,而不是修改源数据
  • 消耗性 -您只能读取一次流,然后与可以多次读取的集合不同,它变为“消耗性”

现在让我们看看我们可以用流解决什么问题。

处理大量数据

假设,我们必须将数据从外部服务复制到我们的数据库中。要复制的数据量可以任意大。我们无法获取所有数据,无法将其存储在一个集合中,然后保存在数据库中,因为这可能会耗尽堆内存。我们必须分批处理数据,并设计外部服务客户端和数据库存储之间的接口。由于流不存储日期,因此可以使用它安全地处理所需的数据量。

在示例(及以下所有示例)中,我们将使用java.util.stream.Stream接口的静态方法来构建流。用Java构建流的最强大,最灵活的方法是实现Spliterator接口,然后使用StreamSupport类将其包装为流。但是,正如我们所看到的,Stream在许多情况下,接口中的静态工厂方法就足够了。

假定一个简单的API从支持分页的外部服务(例如,REST服务,数据库)中获取数据。该API最多可limit从提取项目offset。迭代地使用API​​,我们可以根据需要获取尽可能多的数据。

interface ExternalService {
 List<String> fetch(int offset, int limit);
}

现在,我们可以使用API​​提供数据流,并将API的使用者与分页API隔离开:

class Service<T> {
  private final ExternalService<T> externalService;
  
  public Stream<T> stream(int size, int batchSize) {
    var cursor = new Cursor();
    return Stream
      .generate(() -> next(cursor, size, batchSize))
      .takeWhile(not(List::isEmpty))
      .flatMap(List::stream);
  }

  private List<T> next(Cursor cursor, int size, int batchSize) {
    var fetchSize = Math.min(size - cursor.offset, batchSize);
    var result = externalService.fetch(cursor.offset, fetchSize);
    cursor.inc(result.size());
    return result;
  }
}

Cursor 握有当前偏移量offset:

private static class Cursor {
  private int offset;
   
  void inc(int by) {
    offset += by;
  }
}

我们使用Stream.generate()方法构建无限流,其中每个元素由流提供者创建。流元素是从REST API获取的页面List<T>。将为每个流创建Cursor类的实例,以跟踪获取的元素的进度。

Stream.takeWhile()方法用于检测的最后一页,最后返回的数据流T,而不是List<T>。

我们使用flatMap扁平化流。尽管在某些情况下,保留批处理(例如将整个页面保存在一个事务中)可能很有用。

现在,我们可以使用Service.stream(size, batchSize)来检索任意长流,而无需任何分页API的知识(我们决定公开batchSize参数,但这是一个设计决策)。在任何时间点,内存消耗都受到批处理大小的限制。使用者可以一一处理数据,将其保存在数据库中,或者再次进行批处理(批处理大小可能不同)。

快速访问(不完整)数据

假设我们有一个耗时的操作,必须对数据的每个元素执行该操作,并且计算要花费时间t。对于n元素,使用者必须等待t * n才能接收到计算结果。例如,如果用户正在等待带有计算结果的表,则可能是一个问题。我们希望在显示第一结果时立即显示它们,而不是等待所有结果的计算并立即提交表。

public class Producer1 {
  private Stream<String> buildStream() {
    return Stream.of("a", "b", "c");
  }
  
  private String expensiveStringDoubler(String input) {
    return input + input;
  }

  public Stream<String> stream() {
    return buildStream().map(this::expensiveComputation);
  }
}

消费者:

stream().forEach(System.out::println)

输出:

Processing of: a
aa
Processing of: b
…

输出:

Processing of: a
aa
Processing of: b
…

如我们所见,在开始处理下一个元素之前,用户可以使用第一个元素“ aa ”的处理结果,但是计算仍然是流的生产者责任。换句话说,消费者决定何时以及是否应该执行计算,但是生产者仍然负责如何执行计算。

您可能会认为这很容易,并且不需要流。当然,您是对的,让我们看一下:

public class Producer1Classic {
  public List<String> data() {
    return List.of("a", "b", "c", "d", "e", "f");
  }
  
  public String expensiveStringDoubler(String input) {
    return input + input;
  }
}

消费者:

var producer = new Producer1Classic();

for (String element : producer.data()) {
  System.out.println(producer.expensiveComputation(element));
}

同样的效果,但是实际上我们已经重新发明了轮子,我们的实现模仿了stream的祖先- Iterator并且我们失去了stream的API的优势。

避免过早计算

再次假设我们要对每个流元素执行耗时的操作。在某些情况下,API的使用者无法提前说出需要多少数据。例如:

  • 用户取消了数据加载
  • 在数据处理过程中发生错误,无需处理其余数据
  • 消费者读取数据直到满足条件,例如第一个正值

由于流的惰性,在这种情况下可以避免一些计算。

private Stream<Double> buildStream() {
  return new Random().doubles().boxed();
}

private Double expensiveComputation(Double input) {
  return input / 2;
}

public Stream<Double> stream() {
  return buildStream().map(this::expensiveComputation);
}

消费者:

stream().peek(System.out::println).filter(value -> value > 0.4).findFirst();

在该示例中,使用者读取数据,直到该值大于0.4。生产者并不了解消费者的这种逻辑,但它只计算必要的项目。逻辑(例如条件)可以在用户端独立更改。

API易于使用

使用流而不是自定义API设计还有另一个原因。流是标准库的一部分,并为许多开发人员所熟知。在我们的API中使用流使其他开发人员更容易使用该API。

其他注意事项

错误处理

传统的错误处理不适用于Streams。由于实际处理将推迟到需要时进行,因此构造流时不会引发异常。基本上,我们有两个选择:

  • 引发RuntimeException-终止方法(例如forEach)将引发异常
  • 将元素包装到一个对象中,该对象表示正在处理的元素的当前状态,例如TryVavr库中的特殊类(博客中的详细信息)

资源管理

有时我们必须使用一种资源来提供流数据(例如,外部服务中的会话),并且我们想在流处理完成时将其释放。幸运的是,流实现了Autoclosable接口,我们可以在try-with-resources语句中使用流,从​​而使资源管理变得非常容易。我们要做的就是使用onClose方法在流中注册一个钩子。当流关闭时,该挂钩将自动被调用。

private Stream<Double> buildStream() {
  return new Random().doubles().boxed();
}

private Double expensiveComputation(Double input) {
  if (input > 0.8) throw new RuntimeException("Data processing exception");
  return input / 2;
}

public Stream<Double> stream() {
  return buildStream().map(this::expensiveComputation).onClose(()-> System.out.println("Releasing resources…"));
}

消费者:

try (Stream<Double> stream = stream()){
  stream.forEach(System.out::println);
}

输出:

0.2264004802916616
0.32777949557515484
Releasing resources…
Exception in thread “main” java.lang.RuntimeException: Data processing exception

在该示例中,当发生数据处理异常时,流将通过try-with-resources语句自动关闭,并调用已注册的处理程序。在示例输出中,我们可以看到Releasing resources…处理程序打印的消息。

总结

  1. 流不是集合。
  2. 流可以帮助我们解决以下问题:*处理大量数据*快速访问(不完整的)数据*避免过早计算
  3. 构建流并不难。
  4. 我们必须注意错误处理。
  5. 支持资源管理。

                   

4