使用Kafka并行消费者提高Apache Kafka性能


与其他现代大数据平台一样,Kafka 通过将数据分区到集群中的多个节点来实现无限的水平可扩展性。对于 Kafka,这意味着每个主题都有 1 个或多个分区。

主题拥有的分区越多,并发性就越高,因此潜在的吞吐量就越高。

要在 Kafka 中实际处理消息,您需要有 1 个或多个消费者订阅一个主题。

但是有多少消费者可以订阅一个主题(在一个组中)?这个数量上限有限制吗? 

消费者可以处理来自1个或多个分区的记录。每个分区有一个消费者,有多少个消费者就有多少个分区,这样才能实现最大的吞吐量。

如果消费者是狗狗,而分区是狗碗,那么你想象一下,有哪些场景是可以顺利进行的。一只狗可以用多个碗吃饭,也可以用一个碗,但你不能让多只狗用同一个碗吃饭。你可以有比狗(消费者)更多的碗(分区),但不能有比碗(分区)更多的狗(消费者)

6只饥饿的动物和只有2个碗不是一个好主意!

线程安全和慢速消费者
Kafka消费者不是线程安全的--你不能在多个线程之间共享一个消费者实例。另一方面,Kafka生产者是线程安全的,可以在多个线程之间共享。

默认的Kafka消费者只是单线程的,所以它只能按顺序处理记录,并在成功处理记录后自动提交。如果处理时间接近于0,这样做很好,但如果处理时间较长,就会成为一个问题
这就是所谓的 "慢速消费者"。基本上,每条记录都要排队等待处理,这增加了记录的端到端延迟。  

此外,单线程的消费者不能利用被分配到多个分区(有多个碗的狗)的潜在并发性,因为来自多个分区的记录仍然是按顺序处理的,而不是并发的。

因为每个消费者都是单线程的,为了获得更高的吞吐量,唯一的解决办法是增加分区和消费者的数量。
然而,分区和消费者都会使用资源。
因为Kafka复制,太多的分区实际上会导致最终的吞吐量下降。
多个消费者组(启用高扇出架构)需要更多的分区,有大量消费者的消费者组可能需要很长的时间来重新平衡(在不做任何处理时引入不理想的延迟)。
消费者也需要内存和CPU资源,所以往往需要比Kafka集群本身更多的基础设施资源。

Little定律(Concurrency = Throughput x Time,我们以前曾用它来调整Kafka Connect)解释了消费者的数量(Concurrency),也等于最大吞吐量的分区数量,记录处理的吞吐量,以及记录的处理时间之间的关系,并且可以用来计算给定其他两个变量的任何1个变量。

太多分区消耗太多资源


如何提高Kafka的并发性?
在过去的6年中,我为博客建立了几个示范性的Kafka应用,我很早就经历了Kafka应用扩展的挑战。Kafka本身是可以大规模扩展的,但要确保你的应用是可以端到端扩展的,而且延迟也很低,就需要一些非标准的解决方案。

当你只是(显然)被太多的消费者困住,需要同样多的分区来支持消费者--可能是数以百万计的时候,会发生什么?

例如,我遇到的最早的挑战之一是建立一个物流应用(模拟用卡车运送货物到仓库和从仓库运送货物),这需要实时检查货物的多个规则。最简单的设计是让货物对象直接订阅主题,也就是说,它们与Kafka消费者紧密耦合。然而,这对性能产生了严重的影响,因为需要大量的分区来支持潜在的数百万商品/消费者。还有一个很大的扇出要求,因为一些消息必须被传递给许多消费者,需要大量的消费者组。

我想出的解决方案是将货物与Kafka消费者解耦。我使用Google Guava Event Bus在Kafka消费者和货物Goods对象之间建立了一个单独的事件通知服务。

将真正接受者(货物对象)与Kafka消费者解耦意味着你不需要那么多的Kafka分区/消费者,因为有的货物可能只有1个分区和几个消费者(取决于吞吐量的要求)。

这种将记录处理与消费者轮询循环解耦的方法也被用来改善实时异常检测应用Anomalia Machina的吞吐量和延迟。在这个应用中,Kafka消费者很 "慢",因为每读取一条记录,它都要向Cassandra写数据,从Cassandra读取大量历史数据,并运行异常检测代码。为了得到最终的结果(每天190亿次的异常检查),我最终使用了多线程,有两个线程池,一个池用于轮询循环,一个用于Cassandra客户端和检测器代码,并尝试了动态池的大小和手动池的大小。

因此,使用多线程的Kafka消费者的想法并不新鲜,但要想正确地调整,并使其对不同的应用程序有足够的通用性,是很棘手的。这就是Kafka并行消费者出现的地方。  

并行消费者
正如我们到目前为止所看到的,每个默认的Kafka消费者都是单线程的,因此并发性有限,处理时间长的记录会导致延迟增加,耽误其他记录。

因此,一个明显的解决方案是使用多个线程池将轮询循环与处理循环解耦
池的大小也可以配置,以便尽可能快地从Kafka中读取记录,但也可以同时处理,从而提高吞吐量和减少延迟。

然而,有一些复杂的问题,包括确保偏移提交仍然得到正确的处理,并防止消费者因提交时间的增加而超时。

一个潜在的解决方案是Kafka Parallel Consumer,,它拥有Apache 2.0许可。它有多个组件(和线程池)来为你处理所有这些。

但是,消息的排序呢?默认情况下,Kafka保证消息按分区排序,这也是默认的并发单位。你如何提高消费者端并发性,并且仍然保留排序保证?
基本上,并行消费者允许你为不同的并发级别(最大线程)和排序模式进行配置。
这个想法是,每个并发的 "单元 "可以有一个专门的消费者线程,从而仍然保证排序和最大限度的并发(即消费者、分区、键、记录)。  

并行消费者的主要好处是,每个消费者都是多线程的,所以你可以用更少的消费者做更多的事情(也需要更少的分区),也就是说,按照最小吞吐量到最大吞吐量的顺序:

Default Consumer <= Parallel Consumer (Partition Order) <= Parallel Consumer (Key Order) <= Parallel Consumer (Unordered, Per Record)


Kafka并行消费者允许你选择不同的消息顺序--狗也可以用不同的方式排序,例如,按身高、年龄、可爱程度等。

第 2 部分中,我们将展示一些 Kafka Parallel Consumer 示例代码,慢消费者行为的痕迹,理论上如何实现 100 万 TPS,一些实验结果,Parallel Consumer 还有哪些有趣的地方,最后考虑是否应该使用它是否在生产中。