Java中用流stream计算加权平均值

在本文中,我们将探讨解决同一问题的几种不同方法 - 计算一组值的加权平均值。

什么是加权平均值?
我们通过将所有数字相加然后除以数字的数量来计算一组数字的标准平均值。例如,数字 1、3、5、7、9 的平均值将为 (1 + 3 + 5 + 7 + 9) / 5,等于 5。

当我们计算加权平均值时,我们有一组数字,每个数字都有权重:

1    10
3    20
5    30
7    50
9    40

在这种情况下,我们需要考虑权重。新的计算方法是将每个数字与其权重的乘积相加,然后除以所有权重的总和。例如,这里的平均数是((1 * 10)+(3 * 20)+(5 * 30)+(7 * 50)+(9 * 40))/(10 + 20 + 30 + 50 + 40),等于 6.2。

设置
为了这些示例,我们将进行一些初始设置。最重要的是我们需要一种类型来表示我们的权重值:

private static class Values {
    int value;
    int weight;
    public Values(int value, int weight) {
        this.value = value;
        this.weight = weight;
    }
}

在我们的示例代码中,我们还将获得一组初始值和平均值的预期结果:

private List<Values> values = Arrays.asList(
    new Values(1, 10),
    new Values(3, 20),
    new Values(5, 30),
    new Values(7, 50),
    new Values(9, 40)
);
private Double expected = 6.2;


二次计算
最明显的计算方法正如我们上面所看到的。我们可以迭代数字列表并分别对除法所需的值求和:

double top = values.stream()
  .mapToDouble(v -> v.value * v.weight)
  .sum();
double bottom = values.stream()
  .mapToDouble(v -> v.weight)
  .sum();

完成此操作后,我们的计算现在只是一个除以另一个的情况:

double result = top / bottom;

我们可以通过使用传统的for循环来进一步简化这一点,并在进行过程中进行两个求和。这里的缺点是结果不能是不可变的值:

double top = 0;
double bottom = 0;
for (Values v : values) {
    top += (v.value * v.weight);
    bottom += v.weight;
}

扩展为List列表
我们可以用不同的方式来考虑加权平均计算。我们可以扩展每个加权值,而不是计算乘积之和。例如,我们可以扩展列表以包含 10 个“1”副本、20 个“2”副本,依此类推。此时,我们可以对扩展列表进行直接平均:

double result = values.stream()
  .flatMap(v -> Collections.nCopies(v.weight, v.value).stream())
  .mapToInt(v -> v)
  .average()
  .getAsDouble();

这显然会降低效率,但也可能更清晰、更容易理解。我们还可以更轻松地对最终一组数字进行其他操作 - 例如,通过这种方式找到中位数更容易理解。

Reduce列表
我们已经看到,对乘积和权重求和比尝试展开值更有效。但是,如果我们想在一次传递中完成此操作而不使用可变值怎么办?我们可以使用Streams 中的reduce()功能来实现这一点。特别是,我们将使用它来执行加法,将运行总计收集到一个对象中。

我们想要的第一件事是一个将运行总计收集到的类:

class WeightedAverage {
    final double top;
    final double bottom;
    public WeightedAverage(double top, double bottom) {
        this.top = top;
        this.bottom = bottom;
    }
    double average() {
        return top / bottom;
    }
}

我们还包含了一个average()函数来完成我们的最终计算。现在,我们可以执行缩减操作:

double result = values.stream()
  .reduce(new WeightedAverage(0, 0),
    (acc, next) -> new WeightedAverage(
      acc.top + (next.value * next.weight),
      acc.bottom + next.weight),
    (left, right) -> new WeightedAverage(
      left.top + right.top,
      left.bottom + right.bottom))
  .average();

这看起来很复杂,所以让我们把它分成几部分。

reduce()的第一个参数是我们的身份。这是值为 0 的加权平均值。

下一个参数是一个 lambda,它采用WeightedAverage实例并将下一个值添加到其中。我们会注意到,这里的总和的计算方式与我们之前执行的方式相同。

最后一个参数是用于组合两个WeightedAverage实例的 lambda 。这对于使用reduce()的某些情况是必要的,例如我们在并行流上执行此操作。

然后, reduce()调用的结果是一个WeightedAverage实例,我们可以使用它来计算结果。

定制收集器
我们的reduce()版本当然是干净的,但它比我们的其他尝试更难理解。我们最终将两个 lambda 传递到函数中,并且仍然需要执行后处理步骤来计算平均值。

我们可以探索的最后一个解决方案是编写一个自定义收集器来封装这项工作。这将直接产生我们的结果,并且使用起来会更简单。

在编写收集器之前,我们先看一下需要实现的接口:

public interface Collector<T, A, R> {
    Supplier<A> supplier();
    BiConsumer<A, T> accumulator();
    BinaryOperator<A> combiner();
    Function<A, R> finisher();
    Set<Characteristics> characteristics();
}

这里发生了很多事情,但我们将在构建收集器时解决它。我们还将看到这种额外的复杂性如何允许我们在并行流上而不是仅在顺序流上使用完全相同的收集器。

首先要注意的是泛型类型:

  • T – 这是输入类型。我们的收集器始终需要与其可以收集的值的类型相关联。
  • R – 这是结果类型。我们的收集器总是需要指定它将产生的类型。
  • A – 这是聚合类型。这通常是收集器内部的,但对于某些函数签名是必需的。

这意味着我们需要定义一个聚合类型。这只是一种收集运行结果的类型。我们不能直接在收集器中执行此操作,因为我们需要能够支持并行流,其中可能同时发生数量未知的并行流。因此,我们定义一个单独的类型来存储每个并行流的结果:

class RunningTotals {
    double top;
    double bottom;
    public RunningTotals() {
        this.top = 0;
        this.bottom = 0;
    }
}

这是一种可变类型,但因为它的使用仅限于一个并行流,所以没关系。

现在,我们可以实现我们的收集器方法。我们会注意到大多数都返回 lambda。同样,这是为了支持并行流,其中底层流框架将根据需要调用它们的某种组合。

第一个方法是supplier()。这将构造一个新的、零的RunningTotals实例:

@Override
public Supplier<RunningTotals> supplier() {
    return RunningTotals::new;
}

接下来,我们有Accumulator()。这需要一个RunningTotals实例和下一个Values实例来处理并组合它们,从而就地更新我们的RunningTotals实例:

@Override
public BiConsumer<RunningTotals, Values> accumulator() {
    return (current, next) -> {
        current.top += (next.value * next.weight);
        current.bottom += next.weight;
    };
}

我们的下一个方法是combiner()。这需要两个来自不同并行流的RunningTotals实例,并将它们合并为一个:

@Override
public BinaryOperator<RunningTotals> combiner() {
    return (left, right) -> {
        left.top += right.top;
        left.bottom += right.bottom;
        return left;
    };
}

在这种情况下,我们正在改变我们的输入之一并直接返回它。这是完全安全的,但如果更容易的话我们也可以返回一个新实例。

仅当 JVM 决定将流处理拆分为多个并行流时才会使用此方法,这取决于多个因素。但是,我们应该实施它,以防这种情况发生。

我们需要实现的最后一个 lambda 方法是finisher()。这将获取所有值累积完毕且所有并行流合并后剩下的最终RunningTotals实例,并返回最终结果:

@Override
public Function<RunningTotals, Double> finisher() {
    return rt -> rt.top / rt.bottom;
}

我们的收集器还需要一个features()方法,该方法返回一组描述如何使用收集器的特征。 Collectors.Characteristics枚举包含三个值:
  • CONCURRENT – Accumulator()函数可以安全地从并行线程调用同一聚合实例。如果指定了这一点,则永远不会使用combiner()函数,但aggregation()函数必须格外小心。
  • UNORDERED – 收集器可以以任何顺序安全地处理底层流中的元素。如果未指定,则在可能的情况下,将以正确的顺序提供值。
  • IDENTITY_FINISH – finisher()函数直接返回其输入。如果指定了这一点,那么收集过程可能会短路此调用并直接返回值。

在我们的例子中,我们有一个UNORDERED收集器,但需要省略其他两个:

@Override
public Set<Characteristics> characteristics() {
    return Collections.singleton(Characteristics.UNORDERED);
}

我们现在准备使用我们的收集器:
double result = values.stream().collect(new WeightedAverage());

虽然编写收集器比以前复杂得多,但使用它却要容易得多。我们还可以利用并行流之类的东西,而无需额外的工作,这意味着这为我们提供了一个更易于使用且更强大的解决方案(假设我们需要重用它)。