如何对Kafka 中的消息实现优先分级?


果您了解内部原理,那么您可能想知道如何在 Kafka 中实现消息优先级。因为,就其工作方式而言,Kafka 无法直接实现此用例。如果你真的是 Kafka 的新手,那么一定要坚持到文章结束。我将尽力分享我对 Kafka 的工作原理以及在 Kafka 中实现消息优先级的各种方法的知识。
你们中的一些人可能将 Kafka 视为一个消息队列系统。相信我,我最初也是这样做的。但是,这并不完全正确。Kafka 是一个可水平扩展的消息流平台。它主要用于流处理用例。此外,在基于发布者-订阅者模式(通常称为发布-订阅模型)或事件驱动架构设计的应用程序中用作消息总线。
 
Kafka基础知识

  • Kafka Brokers:Kafka Brokers是一个单独的 Kafka 节点,持有主题和分区。在集群中,多个 Kafka 代理共享负载并在其中一个代理出现故障时启用弹性。
  • Zookeeper: Zookeeper 充当 Kafka 集群的服务发现。它与各种 Kafka 代理进行协调,并在创建、删除新主题、代理加入集群以及代理出现故障时使它们保持同步。此外,当节点出现故障时,有助于领导者选举。
  • 主题Topic:主题类似于消息队列。它保存与给定主题相关的消息。所有消息要么发布到给定主题,要么从给定主题消费。一个给定的主题可以有多个生产者,一个给定的主题可以有多个消费者。每个主题都是一个单一的、逻辑上分布的消息提交日志。创建主题时也可以提供复制因子。这决定了主题下每个分区的复制。复制因子最多可以等于集群中 Kafka 代理的数量。这可以提高弹性,以防 Kafka 代理出现故障。
    主题的复制因子 ≤ 集群中的 Kafka broker 数
  • 分区:一个主题被分成多个分区。创建主题时可以指定分区数。此外,可以修改现有主题的编号。分区只不过是物理存储在文件系统中的仅附加文件。您可以将其视为一个日志文件,其中每一行都是一条消息,并且该文件只会附加在每条传入消息上。
    这些消息是不可变的。意思是,一旦消息被发送到一个主题,你基本上不能去编辑或删除特定的消息。在中间插入消息也是不可能的,因为它是一个仅附加文件。这意味着在 Kafka 中重新排序消息是不可能的。

现在您可能理解了为什么优先级不是 Kafka 的内置功能背后的真正原因。但是,我们可以通过多种方式实现它。
 
回到分区。发送到主题的每条消息都将在内部仅发送到其一个分区。在从主题消费时,消费者有责任从各个分区消费。这意味着消息的 FIFO 排序仅在分区级别得到保证,而不能在主题级别得到保证。
  • 消息: Kafka 期望在我们发送到主题的每条消息中都有一个可选的密钥。该键将被散列以识别消息将落入的分区。确保选择合适的字段作为键。如果选择不当,那么一些分区可能会保持空白,而其他分区则充满了大量消息。如果未提供密钥,则 Kafka 基于循环方式将其路由到分区。此外,消息在各自的分区中最多保留 7 天。可以通过修改retention.ms给定主题的保留配置来修改此保留时间。
    Partition Number = hash(Key) %(主题可用的分区数)
  • 消费者和消费者组:由于 Kafka 不是消息队列系统,消费者必须选择是从给定主题的最早偏移量还是最新偏移量读取。这可以使用 Kafka 客户端上的属性进行设置,auto.offset.reset。这为我们提供了消费者在第一次启动时的控制,是从为主题保留的第一条消息开始消费,还是从收到的最新消息开始消费。Kafka 维护每个消费者的偏移量以进行分区。消费者客户端将选择异步或同步提交在收到每组消息后。消费者客户端甚至可以选择在处理完消息后提交,允许 Kafka 代理在客户端处理失败的情况下重新发送消息。为了允许从消费者角度为给定主题进行负载共享,Kafka 允许使用消费者组。消费者组是属于同一服务的一组实例的唯一标识。即使给定组的所有消费者都出现故障,Kafka 也会存储消费者组的偏移量。这允许消费者组在重新上线时从它停止的地方开始处理。
  • 消费者群体如何分担负载?在 消费者组中,每个消费者至少分配一个分区,这样就不会有两个消费者正在消费同一个分区。确保使用的消费者数量小于或等于主题中提供的分区。如果消费者的数量等于分区,那么每个消费者将被分配一个分区。所以,如果消费者的数量超过分区的数量,那么他们将无限挨饿,造成资源浪费。
    注意:消费者的分区分配默认由 Kafka 自己管理。因此,消费者群体客户不必担心相同的问题。当消费者组中的消费者出现故障时,它正在侦听的分区将在同一消费者组中的其他消费者(如果有)之间共享。

 
Kafka 中消息的优先级排序方法
  • #1 蛮力方式:

解决此问题的最简单方法是为每个优先级创建单独的主题。从生产者的角度来看,我们可以根据优先级逻辑编写一个发布到各自主题的逻辑。从消费者的角度,我们可以写一段代码,先监听优先级最高的topic,一直处理到没有消息为止。然后,我们可以回退到较低优先级的队列等等。
在此GitHub 存储库中查看上述实现的代码。
使用一个PostConstruct方法初始化了类,该方法将在启动期间由 Spring 创建对象后执行。如果你不明白这一点。它与构造函数非常相似。在这种情况下,一旦对象被初始化,这个方法就会被触发并执行。我们正在List<TopicConsumer>以高优先级主题消费者首先出现的方式初始化,然后是中,然后是低。
作为下一步。我已经创建了一个使用者,它将在应用程序准备就绪时启动。
在这里,我们以一种方式无限地消费,如果有任何消息,我们每次都会检查最高优先级的主题。这样,可以从消费者的角度实施优先级排序。
这个实现是有问题的。如果高优先级主题总是有一些消息,那么消费者将不会消费其他优先级主题。但是在Flipkart 的 Incubator下的priority-kafka-client 中有更好的类似逻辑的实现。请查看他们的文档!
 
  • #2 Resequencer 模式:

这是一种消息路由设计模式,有助于解决我们的优先级问题。重新排序器是一个自定义组件,它接收可能未按顺序到达的消息流。它有一个内部缓冲区来存储一组消息,还具有对消息进行排序并将其发布到输出通道的逻辑。
与第一种方法不同,我们在这里使用单个主题。这种模式可以通过在发布者和消费者之间引入一个服务来实现。此服务使用包含要确定优先级的消息的 Kafka 主题。该服务负责处理预定义的消息缓冲区。当消息数量达到给定容量时,让应用排序。如果我们没有收到所需的消息容量,还必须支持超时。应用排序后,缓冲区中的所有消息都将发布到传出通道。这个传出频道主题可以被我们的实际消费者消费,它首先期望优先消息。
我已经利用Spring Boot Apache Camel 的 Resequence来实现以下 Kafka 集成。
以在此GitHub 存储库 中找到此代码实现。
我们需要实现一个ExpressionResultComparator,以后可以在创建 Camel Route 时使用它。如果您对此完全陌生。别担心,只关注比较方法。我们只是获取消息并根据 String 的compareTo方法进行比较。这意味着,无论我们发送什么,我们都希望输出按字母顺序排列。
这种模式不是可用于确定消息优先级的最佳解决方案。根据模式,重新排序器应该收集消息,直到达到某个条件。当缓冲区达到最大容量或指定的时间用完时,可以满足此条件。这意味着在实际消费者端接收优先级消息可能会有延迟。此外,如果我们希望此解决方案高效,那么我们需要最大化缓冲区大小并减少超时。但这并不是提高效率的唯一因素。它还取决于传入的消息吞吐量。这是可以预测的,但我们不能确定这个因素。
 
  • #3 桶优先模式

这是迄今为止可用的最佳实现。此模式取决于主题分区。该模式通过在称为桶的给定主题分区上创建抽象来解决优先级问题。桶可能包含一个或多个分区。桶的优先级直接取决于落入该桶的分区数。分区数越多,bucket 的优先级就越高。从基础知识,我们知道一个分区只能有一个消费者组中的一个消费者。这意味着每个存储桶的并行消费者数量直接取决于该存储桶中可用的分区数量。由于最高优先级获得最多的分区,因此它也获得了最多的消费者。
与其他两种方法不同,这种模式允许消费者在给定时间内消费所有优先级消息。只有给定优先级的消费者数量会发生变化。
代码GitHub repository.
可以使用消息中的键字段将其路由到特定分区。从生产者的角度来看,我们可以将消息路由到给定的bucket。我们可以期望负载平衡的消费者按照各自的优先级使用它。
这是由Ricardo Ferreira实现的