Apache Kafka和Spring Boot的容错和可靠消息传递 – Arnold Galovics


在过去的几年中,Kafka已经开始大幅增加其市场份额。除了微服务和消息传递之外,还有一种已经开始流行的架构模式:事件溯源。
Kafka提供了架构模式所需的属性,因此非常适合事件采购。事件源中的关键概念之一是存储不可变的事件序列(将其视为审核日志)以捕获系统状态。这样就可以在任何给定时间通过重播事件直到特定点来重新创建系统状态。当然,作为每种模式,它都有缺点。它引入了许多必须解决的难题,而在编写简单数据库的标准应用程序时,您可能会忘记这些问题。
希望实现组件之间基于容错和可靠的Kafka消息传递的通信,不可靠情况主要有两个:

  • 正在处理消息时崩溃的服务
  • 由于外部系统(例如数据库)不可用,无法处理该消息

这两种情况可以使用Kafka 自动提交功能附带的最多一次交付保证实现,但是可能会造成重复消息,这对于重视事件消息序列的事件溯源是不方便的。
这里介绍远离自动提交模式的一般原理,并保证至少一次传递消息的手工方式:
这个想法很简单。开始消耗消息时,不要立即提交读取的偏移量,而要等到处理完成后再手动提交偏移量。采用这种方法可以确保仅在通过应用程序逻辑处理消息后才认为消息已处理。这解决了发生崩溃时丢失应用程序中运行中消息的问题。
但是,要考虑的一件事。当您崩溃后重新启动应用程序时。它必须能够重新处理该消息。换句话说,消息处理应该是幂等的。想象一下,当用户向系统注册时,以及将新行插入数据库后,应用程序发生故障失败,因此不会提交偏移量。然后重新启动应用,并选择相同的消息,然后将具有相同数据的行再次成功插入数据库。
从错误中恢复的解决方案:
  1. 一种方法当使用者在处理过程中崩溃时重新传递消息;
  2. 另一种方法是在发生可恢复的错误(例如,数据库在短时间内不可用)的情况下,实现一种重试消息处理的方法。

在本文中,我想展示使用Spring Boot解决这两个问题的示例解决方案。
本文源码在GitHub上

首先,我们需要建立基础设施,即Kafka和Zookeeper。我在Windows的Docker上使用以下docker-compose.yml:

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    hostname: zookeeper
  kafka:
    image: wurstmeister/kafka
    command: [start-kafka.sh]
    ports:
      -
"9092:9092"
    hostname: kafka
    environment:
      KAFKA_CREATE_TOPICS:
"normal-topic:1:1"
      KAFKA_LISTENERS: PLAINTEXT:
//:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT:
//localhost:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      -
"zookeeper"

现在,我们将使用一个主题,具有单个复制和单个分区的normal-topic。
执行docker-compose up完之后,我们就可以使用基础架构了。
现在,让我们在Spring Boot应用程序中创建使用者。它会非常简单,相信我。我们需要一门新课,我叫它NormalTopicConsumer。现在唯一要做的是注销已读取的消息。这里还有一件事,我正在使用Lombok,因为我懒得写记录器。

@Component
@Slf4j
public class NormalTopicConsumer {
    @KafkaListener(id = "normal-topic-consumer", groupId = "normal-topic-group", topics = "normal-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord) {
        String json = consumerRecord.value().toString();
        log.info(
"Consuming normal message {}", json);
    }
}

application.properties:
spring.kafka.consumer.bootstrap-servers=localhost:9092

第一步,我们需要找出容器ID是什么,您可以使用docker ps它。然后exec放入容器:

$ docker exec -it a0a7 bash

然后进入生产者模式并发送示例JSON消息:
$ sh /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic normal-topic>{"data":"test"}

在应用程序方面,我们应该看到以下日志:
[-consumer-0-C-1] c.a.b.n.NormalTopicConsumer : Consuming normal message {"data":"test"}

太好了,让我们继续。

实施手动提交
对于手动提交模式,实现非常简单。需要更改一些配置以禁用自动提交模式并启用手动配置。然后对代码进行一些调整。
application.properties:

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

此处提供不同类型的确认模式。其中有2个相关。MANUAL和MANUAL_IMMEDIATE。差异在提到的页面上有所描述。这篇文章我将继续使用MANUAL_IMMEDIATE。
代码更改非常简单。我们唯一需要做的就是扩展使用者方法的参数列表,并在其中添加一个Acknowledgment参数。Spring将自动填充它。该对象提供一个确认方法,该方法手动提交读取的偏移量。该MANUAL_IMMEDIATEACK模式设置的方式,只要消费者acknowledge方法被调用时,它会立即告诉经纪人,消费者已成功处理消息。

@Component
@Slf4j
public class NormalTopicConsumer {
    @KafkaListener(id = "normal-topic-consumer", groupId = "normal-topic-group", topics = "normal-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        log.info(
"Consuming normal message {}", json);
        ack.acknowledge();
    }
}

如果您以调试模式启动应用程序,并且在acknowledge调用之前放置了一个断点。同时,您通过控制台生产者发送消息。当执行在断点处暂停并且您终止了应用程序时,偏移量尚未提交。启动该应用程序将导致重新发送相同的消息并再次对其进行重新处理。这就是我们想要实现的行为。

实施死信队列DLQ
有一个处理失败消息的行业标准,称为死信队列(DLQ)。这实际上是新主题和使用者正在做的事情,但是我们不仅将失败的消息放到主题上以在将来进行检查,还增强了它作为重试的方式。
DLQ使用者可以重新发送消息,这很好,但是我们应该能够摆脱无限重试的麻烦。因此,我们可以引入retryCount的概念,该概念告诉DLQ使用者重试特定消息多少次。如果达到阈值(例如5),它将停止重试该消息。达到阈值后,应该以自动方式或手动方式仔细检查邮件。您可以简单地记录该消息并使用Kibana,Splunk等来检测那些重试但失败的消息。您可以使用专用存储来放置这些存储,例如S3,但这只是体系结构的问题,什么适合您的情况。
有了这个功能,系统将不会无限地重试,这真棒。但是,如果您再考虑重试机制,重试时普通消费者会收到一条消息。它失败了,因为它无法访问MySQL数据库。如果应用程序立即重试,但是数据库没有恢复联机时,就显得太快。对于更智能的重试逻辑,可以在重新发送消息时引入延迟。它可以是恒定的延迟(例如5秒),也可以是指数延迟,例如从1秒开始,然后连续增加。
带有重试计数的可重试消息:

{
    "retryCount": 0,
   
"message": {
       
"data": "..."
    }
}

如果您使用的Kafka代理支持重试计数,则另一种可能性是将重试计数存储在消息标头中。在这种情况下,它是完全透明的。
当需要在DLQ使用者中重新发送消息时,它必须确定必须将消息重新发送到哪个主题。同样,这里有多种可能性,但让我提及其中两种。
  1. 一种方法是将原始主题存储在消息中,就像重试计数一样,或者利用Kafka标头。
  2. 另一种方法是将原始主题编码为DLQ主题。想象一下,DLQ使用者可以基于正则表达式收听许多主题。该规则可能非常简单,如果主题名称以-dlq结尾,则必须继续监听。如果使用这种方法,则将正常主题称为normal-topic,并且在消息处理失败的情况下,我们会将消息发送到normal-topic-dlq,DLQ使用者可以轻松推断出它需要重新发送到哪个主题。

在这里我将使用Kafka标头,因为当您不以这种详细程度污染实​​际的消息内容(有效负载payload)时,我认为它是一种更干净的实现。
在中docker-compose.yml,我将添加具有复制因子1和分区1 的新dlq-topic。

KAFKA_CREATE_TOPICS: "normal-topic:1:1,dlq-topic:1:1"

DLQ的目的是要有一个发送失败消息的地方。NormalTopicConsumer类需要被改变了一点点:

@Component
@Slf4j
public class NormalTopicConsumer {
    public static final String ORIGINAL_TOPIC_HEADER_KEY = "originalTopic";
    public static final String DLQ_TOPIC =
"dlq-topic";
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    @KafkaListener(id =
"normal-topic-consumer", groupId = "normal-topic-group", topics = "normal-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        try {
            log.info(
"Consuming normal message {}", json);
           
// Simulating an error case
           
// throw new RuntimeException();
        } catch (Exception e) {
            log.info(
"Message consumption failed for message {}", json);
            String originalTopic = consumerRecord.topic();
            ProducerRecord<String, String> record = new ProducerRecord<>(DLQ_TOPIC, json);
            record.headers().add(ORIGINAL_TOPIC_HEADER_KEY, originalTopic.getBytes(UTF_8));
            kafkaTemplate.send(record);
        } finally {
            ack.acknowledge();
        }
    }
}

我在这里做了几件事。一种是,我已将处理逻辑放入try子句中,以便捕获由于逻辑而掉的任何错误,我们可以采取必要的步骤进行恢复。显然,目前这只是一个日志记录,并且不会很快失败,所以这就是为什么我在那儿抛出新的记录RuntimeException。只需取消注释,处理逻辑就会失败。
另一件事是catch子句。我正在创建一个ProducerRecord来保存有效负载并存储应该将其发送到哪个主题(dlq-topic)。最后但并非最不重要的一点是,在originalTopic键下将当前处理的主题名称添加为消息的标题。由于Kafka客户端API仅接受字节作为标头,因此必须首先转换String值。我知道这并不漂亮,但我们不能忍受。
而且我还添加了一个finally子句以始终提交偏移量,即使它失败了。如果处理成功完成,则因此提交。如果处理失败,则在将其发送到DLQ之后,我们仍然需要提交它,否则,使用者将收到来自代理的相同消息,并且将继续失败。那不是我们想要的。
现在,另一边是DLQ。在正常情况下,仅将失败的消息发送到DLQ就足够了,但是由于我想将其用作重试方式,因此我们需要在其中添加一些逻辑。我要为此使用其他服务。同样的交易,生成的项目就像正常主题消费者服务一样。我称它为dlq-topic-consumer。
配置与normal-topic-consumer相同:

application.properties:

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

一个用于消费逻辑的Spring bean DlqTopicConsumer:

@Component
@Slf4j
public class DlqTopicConsumer {
    public static final String ORIGINAL_TOPIC_HEADER_KEY = "originalTopic";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    @KafkaListener(id =
"dlq-topic-consumer", groupId = "dlq-topic-group", topics = "dlq-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        try {
            Header originalTopicHeader = consumerRecord.headers().lastHeader(ORIGINAL_TOPIC_HEADER_KEY);
            if (originalTopicHeader != null) {
                String originalTopic = new String(originalTopicHeader.value(), UTF_8);
                log.info(
"Consuming DLQ message {} from originalTopic {}", json, originalTopic);
            } else {
                log.error(
"Unable to read DLQ message because it's missing the originalTopic header");
            }
        } catch (Exception e) {
            log.error(
"Unable to process DLQ message {}", json);
        } finally {
            ack.acknowledge();
        }
    }
}
 

它只做一件事情。如果邮件具有originalTopic可用的标头,则仅将邮件与原始主题一起注销。到目前为止,这已经足够了,但是当我们遇到重试逻辑时,我们将构建一个更复杂的解决方案。
normal-topic-consumer对正常事件消息进行提取,处理失败消息,并且它将向dlq-topic发送失败的消息并提交偏移量。将消息发送到DLQ主题后,将启动dlq-topic-consumer服务。它将拾取消息并进行记录。太棒了

实施重试逻辑
它的工作方式是这样的。DLQ使用者将消息重新发送到原始主题时,将在消息上设置retryCount标头。如果传入的DLQ消息中还没有retryCount标头,它将把它设置为零。如果已经存在,它将读出来,将其递增,然后将新值设置为外发消息。另一方面,如果可用,普通主题使用者将把retryCount标头复制到DLQ消息中。
当DLQ使用者要重新发送消息时,它将根据阈值检查重试计数。在这里,我将使用5作为阈值,但是您可以使用适合您的任何值。
NormalTopicConsumer:

@Component
@Slf4j
public class NormalTopicConsumer {
    public static final String RETRY_COUNT_HEADER_KEY = "retryCount";
    public static final String ORIGINAL_TOPIC_HEADER_KEY =
"originalTopic";
    public static final String DLQ_TOPIC =
"dlq-topic";
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    @KafkaListener(id =
"normal-topic-consumer", groupId = "normal-topic-group", topics = "normal-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        try {
            log.info(
"Consuming normal message {}", json);
            throw new RuntimeException();
        } catch (Exception e) {
            log.info(
"Message consumption failed for message {}", json);
            String originalTopic = consumerRecord.topic();
            ProducerRecord<String, String> record = new ProducerRecord<>(DLQ_TOPIC, json);
            record.headers().add(ORIGINAL_TOPIC_HEADER_KEY, originalTopic.getBytes(UTF_8));
 
            Header retryCount = consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER_KEY);
            if (retryCount != null) {
                record.headers().add(retryCount);
            }
            kafkaTemplate.send(record);
        } finally {
            ack.acknowledge();
        }
    }
}

DlqTopicConsumer:
@Component
@Slf4j
public class DlqTopicConsumer {
    public static final String RETRY_COUNT_HEADER_KEY = "retryCount";
    public static final String ORIGINAL_TOPIC_HEADER_KEY =
"originalTopic";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    @KafkaListener(id =
"dlq-topic-consumer", groupId = "dlq-topic-group", topics = "dlq-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        try {
            log.info(
"Consuming DLQ message {}", json);
            Header originalTopicHeader = consumerRecord.headers().lastHeader(ORIGINAL_TOPIC_HEADER_KEY);
            if (originalTopicHeader != null) {
                String originalTopic = new String(originalTopicHeader.value(), UTF_8);
                Header retryCountHeader = consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER_KEY);
                int retryCount = 0;
                if (retryCountHeader != null) {
                    retryCount = Integer.parseInt(new String(retryCountHeader.value(), UTF_8));
                }
                if (retryCount < 5) {
                    retryCount += 1;
                    log.info(
"Resending attempt {}", retryCount);
                    ProducerRecord<String, String> record = new ProducerRecord<>(originalTopic, json);
                    byte[] retryCountHeaderInByte = Integer.valueOf(retryCount).toString().getBytes(UTF_8);
                    record.headers().add(RETRY_COUNT_HEADER_KEY, retryCountHeaderInByte);
                    kafkaTemplate.send(record);
                    });
                } else {
                    log.error(
"Retry limit exceeded for message {}", json);
                }
            } else {
                log.error(
"Unable to resend DLQ message because it's missing the originalTopic header");
            }
        } catch (Exception e) {
            log.error(
"Unable to process DLQ message {}", json);
        } finally {
            ack.acknowledge();
        }
    }
}

剩下的事情就是使重试逻辑更加智能,在重发中引入一些延迟。这可以通过使用AsyncTaskExecutor来实现,我们将在另一个线程中发送消息,但会在开始时将其休眠一段时间,这里我使用了5秒的延迟。最终代码如下所示:

@Component
@Slf4j
public class DlqTopicConsumer {
    public static final String RETRY_COUNT_HEADER_KEY = "retryCount";
    public static final String ORIGINAL_TOPIC_HEADER_KEY =
"originalTopic";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    @Autowired
    private AsyncTaskExecutor asyncTaskExecutor;
 
    @KafkaListener(id =
"dlq-topic-consumer", groupId = "dlq-topic-group", topics = "dlq-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        try {
            log.info(
"Consuming DLQ message {}", json);
            Header originalTopicHeader = consumerRecord.headers().lastHeader(ORIGINAL_TOPIC_HEADER_KEY);
            if (originalTopicHeader != null) {
                String originalTopic = new String(originalTopicHeader.value(), UTF_8);
                Header retryCountHeader = consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER_KEY);
                int retryCount = 0;
                if (retryCountHeader != null) {
                    retryCount = Integer.parseInt(new String(retryCountHeader.value(), UTF_8));
                }
                if (retryCount < 5) {
                    retryCount += 1;
                    log.info(
"Resending attempt {}", retryCount);
                    ProducerRecord<String, String> record = new ProducerRecord<>(originalTopic, json);
                    byte[] retryCountHeaderInByte = Integer.valueOf(retryCount).toString().getBytes(UTF_8);
                    record.headers().add(RETRY_COUNT_HEADER_KEY, retryCountHeaderInByte);
                    asyncTaskExecutor.execute(() -> {
                        try {
                            log.info(
"Waiting for 5 seconds until resend");
                            Thread.sleep(5000);
                            kafkaTemplate.send(record);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    });
                } else {
                    log.error(
"Retry limit exceeded for message {}", json);
                }
            } else {
                log.error(
"Unable to resend DLQ message because it's missing the originalTopic header");
            }
        } catch (Exception e) {
            log.error(
"Unable to process DLQ message {}", json);
        } finally {
            ack.acknowledge();
        }
    }
}
 

更新: Jakub在评论中完美地指出,如果DLQ服务在异步线程上崩溃时,等待消息重新发送,它将丢失消息。最快的解决方案是不使用异步重发模型,而是在原始线程中进行等待。但是,这会影响DLQ服务的整体吞吐量。

@Component
@Slf4j
public class DlqTopicConsumer {
    public static final String RETRY_COUNT_HEADER_KEY = "retryCount";
    public static final String ORIGINAL_TOPIC_HEADER_KEY =
"originalTopic";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    @KafkaListener(id =
"dlq-topic-consumer", groupId = "dlq-topic-group", topics = "dlq-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        try {
            log.info(
"Consuming DLQ message {}", json);
            Header originalTopicHeader = consumerRecord.headers().lastHeader(ORIGINAL_TOPIC_HEADER_KEY);
            if (originalTopicHeader != null) {
                String originalTopic = new String(originalTopicHeader.value(), UTF_8);
                Header retryCountHeader = consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER_KEY);
                int retryCount = 0;
                if (retryCountHeader != null) {
                    retryCount = Integer.parseInt(new String(retryCountHeader.value(), UTF_8));
                }
                if (retryCount < 5) {
                    retryCount += 1;
                    log.info(
"Resending attempt {}", retryCount);
                    ProducerRecord<String, String> record = new ProducerRecord<>(originalTopic, json);
                    byte[] retryCountHeaderInByte = Integer.valueOf(retryCount).toString().getBytes(UTF_8);
                    record.headers().add(RETRY_COUNT_HEADER_KEY, retryCountHeaderInByte);
                    log.info(
"Waiting for 5 seconds until resend");
                    Thread.sleep(5000);
                    kafkaTemplate.send(record);
                } else {
                    log.error(
"Retry limit exceeded for message {}", json);
                }
            } else {
                log.error(
"Unable to resend DLQ message because it's missing the originalTopic header");
            }
        } catch (Exception e) {
            log.error(
"Unable to process DLQ message {}", json);
        } finally {
            ack.acknowledge();
        }
    }
}

另一种选择是为延迟的消息保留一个持久性存储。万一服务崩溃,它可以接收重新发送的消息。(mailbox邮箱模式)