如何在 Flink 中处理数据倾斜?


数据倾斜是指数据集的不平衡分布。这种不平衡通常是通过特定指标或领域的镜头观察到的。我们可以说一个国家的人口数据集在按人口中心分组时是有偏差的(假设更多的人住在几个大城市,而其他地方的人口较少)。

这本身并不是一件坏事。大多数数据集都有不可避免的固有偏差(毕竟大多数人确实生活在大城市)。当以并行方式处理这些数据时,问题就来了,无论是在 ETL 处理应用程序中还是在数据库本身中。如果管理不当,我们最终可能会遇到少数节点处理绝大多数数据,而其余节点则保持空闲状态,几乎没有什么可处理的。

在我们的例子中,我们用来生成和存储这些每分钟聚合的 Flink 作业面临着这个确切的问题。我们聚合的一个字段严重倾斜,并且在尝试按该字段进行键控时会损害性能。

Map-Reduce Combiner 组合器模式
以更简单的方式缓解该问题的一种方法是减少我们发送到窗口函数的记录数量。这通常是通过执行预聚合来完成的,将一定数量的记录合并为一个记录,以便最终结果分散得更少,处理速度更快。

这是一个众所周知的模式,并在某些 Map-Reduce 过程中使用。它被称为 Map-Reduce Combiner 模式,在这里有更详细的解释。

Felipe Gutierrez 的文章中详细描述了他在类似问题上的经历,以及他最终如何编写自己的 Flink 运算符来实现 Map-Reduce Combiner 模式。事实证明,Flink 1.13(KDA 中截至 2022 年 10 月的最新版本)并未提供开箱即用的运算符,允许我们在应用程序中使用此模式。
因此,我们也考虑过编写自己的运算符,但最后,我们发现了一些已经完成的东西,基本上已经为我们完成了所有这些工作。

Bundle Operator
另外一个解决方案是Bundle 操作,或者说,是 Flink 1.15 版本的一部分。

Blink 是 Fl​​ink 的一个分支,由阿里巴巴的工程师打造。Blink 最初是为了满足他们的需求而创建的,现在是一个易于访问的开源分支,甚至更好:他们实现了这个Bundle Operator的一个版本。

正如该类的名称所暗示的那样,它允许我们在窗口过程中将记录捆绑在一起,并在它们达到记录限制或窗口时间(我们都可以自定义)时刷新这些捆绑包。

在我们的特定情况下,我们不想完全切换到 Blink,所以我们只是添加了一个依赖项,使我们能够使用 BundleOperator,同时仍然使用主要的 Flink 分支来完成其余部分。
BundleOperator 的工作方式是将值(在我们的例子中是事件视图)累积到单个记录(每个站点)中,直到满足捆绑的结束条件,通常是捆绑了一定数量的记录。然后,这个包被发送到第二个聚合计算最终数字并将结果发送到接收器(我们的 PostgreSQL 数据库)。

我们的解决方案
现在我们所要做的就是编写一个实现此运算符的函数,并在我们的管道中添加一个新步骤,使用此方法预先聚合记录:

val windowResult = env.addSource(source) 
.name("Messages"
.map(Message.fromLine(_)) 
.filter(m => m.typeOfEntity == “EVENT” && m.typeOfEvent == “VIEW”) 
.name(
"FilteredEvents"
.transform( 
 
"PreAggregation"
  new MapBundleOperator[(String, Instant), Int, Message, Message] ( 
   new AggregateViewsBundleFunction, 
   new CountBundleTrigger(100), 
   new KeySelector[Message,(String, Instant)] { 
    override def getKey(in: Message) = (in.site, in.timestamp) 
   } 
  ) 

.keyingBy(_.site) 
.window(TumblingEventTimeWindows.of(Time.minutes(1))) 
.aggregate(new AggregateViews) 
.name(
"ViewEventsAggregation")

现在的主要区别在于我们有使用 MapBundleOperator 和 MapBundleFunction 的 PreAgregation 步骤:

class AggregateViewsBundleFunction extends MapBundleFunction[(String, Instant), Int, Message, Message] {
 override def addInput(value: Int, input: Message): Int = value + 1
 override def finishBundle (
  buffer: java.util.Map[(String, Instant), Int],
  out: Collector[Message]
 ): Unit = {
  val outputValues = buffer.asScala.map(Message.fromMap _)
    .toSeq.sortBy(_.timestamp.toEpochMilli)(Ordering[Long].reverse)
  outputValues.foreach(out.collect)
 }
}

我们设置的记录限制(CounterTrigger中的那100条)是一个对我们有效的限制。

在低吞吐量的情况下,这个记录限制在很大程度上会被忽略,因为水印窗口(默认为200ms)几乎总是先关闭,并强制刷新捆绑的记录。

这两个参数(捆绑限制和水印窗口)都可以进行微调,以适应你通常的数据吞吐量和水印需求,最终是一个微调的问题。

详细点击标题