KMQ:基于Apache Kafka的可靠性消息队列

21-06-14 banq

当从Apache Kafka接收消息时,只能确认所有消息的处理达到给定的偏移量。由于这种机制,如果出现任何问题并且我们的处理组件出现故障,重新启动后它将从最后提交的偏移量开始处理。

但是,在某些情况下,您真正​​需要的是选择性消息确认,正如“传统”消息队列RabbitMQ或ActiveMQ一样:也就是说,我们要一个一个地确认消息的处理。例如,当与外部系统集成时,这可能很有用,其中每条消息对应于一个外部调用并且可能会失败。

如果消息在配置的时间段内未得到确认,则会重新发送并重试处理。这正是Amazon SQS 的工作方式。

这种行为也可以在 Kafka 之上实现,这就是kmq所做的。它使用一个额外的markers主题,需要跟踪处理已经开始和结束的消息。

 

KMQ

使用kmq您可以确认 Kafka 中单个消息的处理,并在超时后重新发送未确认的消息。这与通常的 Kafka 偏移提交机制形成对比,使用该机制您可以仅确认给定偏移量以内的所有消息。

确认机制使用一个marker主题,该主题应该具有与“主”数据主题(称为queue主题)相同数量的分区。标记主题用于通过为每条消息编写开始/结束标记来跟踪已处理的消息。

处理消息的流程如下:

  1. 从queue主题中批量读取消息
  2. 为每条消息向markers主题写一个start标记,消息会等待等到这个标记被写入
  3. 向queue主题提交最大的消息偏移量
  4. 处理消息
  5. 对于每条消息,写一个end标记。无需等到标记写入。

这确保至少处理每条消息一次。请注意,可以从不同的线程、服务器或应用程序对每条消息分别、乱序地确认每条消息(写入 end标记)。

 

使用的应用程序kmq应包含以下组件:

  • 一些RedeliveryTrackers。该组件使用marker主题并在适当时重新传递消息。应在群集中启动多个副本以进行故障转移。使用自动分区分配。
  • 将数据发送queue到要处理的主题的组件
  • 队列客户端,自定义或使用 KmqClient

 

依赖:

<dependency>
    <groupId>com.softwaremill.kmq</groupId>
    <artifactId>core_2.12</artifactId>
    <version>0.2</version>
</dependency>

猜你喜欢