解决Kafka消息丢失的一个简单办法


虽然Kafka是一个功能强大的消息系统,但由于网络问题,我们可能会遇到一些不理想的情况。我们也遇到过由于网络问题导致的事件丢失,在试图避免这些丢失的同时,我们也遇到了由于相同原因导致的不同问题。

在这篇文章中,我解释了我们如何解决在使用kafka作为生产者和消费者时遇到的问题,我们解决这些问题的结果,以及我们如何改进这些问题。

问题
作为Trendyol的交付核心团队,由于特定领域的逻辑,

  • 在处理完从主主题消耗的事件后,我们将其发布到kafka中的重试主题。
  • 通过调度器项目,我们从重试主题中获取事件,并将其发布回主主题。
  • 从主主题中消费并处理完事件后,我们将其发布到重试话题中。

这个过程一直持续到事件相关流程完成。

在生产者的实现中,当向kafka发送一个事件时,我们使用提供的回调函数,一旦执行完成就给出执行结果。当事件未能被发送到目标主题时,kafka会自动重试发送未被broker完全ack的事件。

private void sendToKafka(ProducerRecord<Object, Object> producerRecord, Object body) {
    try {
        ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(producerRecord);
        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onSuccess(SendResult<Object, Object> result) {
                return;
            }

            @Override
            public void onFailure(Throwable ex) {
                sendEventToErrorHandler(producerRecord, body);
            }
        });
    } catch (Exception e) {
        sendEventToErrorHandler(producerRecord, body);
        throw new Exception(e.getMessage());
    }
}

此时,在出错的情况下,kafka会尝试发送默认的重试次数。

在这些重试中,生产者可以成功地将事件发送到kafka。

但是,我们会得到一个超时异常,而没有得到事件被成功写入所有同步副本的响应。

在这种情况下,由于我们不确定事件是否发送到了主题,我们是通过couchbase以outbox模式发送事件到kafka。

1 个事件发生 1 个重复事件,总共有 2 个相同事件。如果在下一次重试中处理 2 个事件时继续出现相同错误,那么这次将出现 4 个相同事件。这种情况将呈指数级增长。

我们在Kafka中消费事件时使用批量消费者。默认的Kafka批量消费者的批量大小为500,我们使用默认值。我们并没有为每个事件使用和处理一个线程,而是将属于相同键的事件以列表的形式集中在一起,为每个唯一的键创建和运行一个线程。

由于特定领域的逻辑,我们不能直接获取和处理第一个具有相同键的事件。
我们需要从属于同一键的事件中处理满足特定条件的事件。

private <T> void processBatchEvent(List<ConsumerRecord<String, String>> consumerRecords, Consumer<T> consumerService, Class<T> eventClass) {
    Map<String, List<ConsumerRecord<String, String>>> eventMap = new HashMap<>();
    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
        List<ConsumerRecord<String, String>> records = eventMap.computeIfAbsent(consumerRecord.key(), val -> new ArrayList<>());
        records.add(consumerRecord);
    }

    try {
        List<CompletableFuture<Boolean>> completableFutures = eventMap.values().stream()
                .map(eventWrappers -> {
                       return kafkaBatchConsumerService.processEvent(eventWrappers, consumerService, eventClass);
                }).collect(Collectors.toList());
        completableFutures.forEach(CompletableFuture::join);
    } catch (Exception e) {
        throw e;
    }
}

@Async("asyncExecutorForKafkaBatchConsumer")
public <T> CompletableFuture<Boolean> processEvent(List<ConsumerRecord<String, String>> records, Consumer<T> consumerService, Class<T> eventClass) {

    for (ConsumerRecord<String, String> consumerRecord : records) {
        try {
            T event = getEvent(consumerRecord, eventClass);
            
            if(domainSpecificFilter(event)) {
               consumerService.accept(event);
            } 

        } catch (Exception e) {
            produceErrorEvent(consumerRecord);
// outbox pattern error handler
        }
    }
    return CompletableFuture.completedFuture(true);
}


由于重复事件和通过单个线程处理具有相同键的事件的逻辑,有时一个线程必须处理 500 个事件。
如果我们认为一个事件的处理时间为250毫秒,那么单线程处理500个事件至少需要125秒(约2分钟)。

因为总的批量事件处理时间;

  • pod 的 CPU、内存和网络使用率高(
  • 高资源使用率导致消费者客户端崩溃,并且在 session.timeout.ms 的持续时间内无法发送心跳,然后消费者被视为死亡,其分区被重新分配并发生重新平衡。
  • 重新平衡后,消费者无法提交任何内容,会发生CommitFailedException,实际处理过的事件会被重新处理。
  • kubernetes集群中的超时
  • 导致消费者滞后
  • 重启容器/节点

解决问题
由于超时、资源占用和滞后不断增加,我们开始考虑解决由发件箱模式(Outbox Pattern)引起的重复事件。

我们不想放弃发件箱模式,因为如果放弃,我们就无法处理事件损失。

因此,我们试图消除重复事件。在所有项目中,常用的processBatchEvent和processEventmethods作为批量消费者处理事件。在processEvent方法中,我们意识到只处理第一个通过domainSpecificFilter()控件的事件就足够了,其余事件不需要处理。

因此,我们重构了该方法:

@Async("asyncExecutorForKafkaBatchConsumer")
public <T> CompletableFuture<Boolean> processEvent(List<ConsumerRecord<String, String>> records, Consumer<T> consumerService, Class<T> eventClass) {

    for (ConsumerRecord<String, String> consumerRecord : records) {
        try {
            T event = getEvent(consumerRecord, eventClass);
            
            if(domainSpecificFilter(event)) {
               consumerService.accept(event);
               break;
// the only change 
            } 

        } catch (Exception e) {
            produceErrorEvent(consumerRecord);
// outbox pattern error handler
        }
    }
    return CompletableFuture.completedFuture(true);
}


在处理完第一个通过控制的事件后,我们使用 break 命令阻止处理其余事件。

因此,我们无需重新处理每个通过控制的重复事件并将其发送到重试主题,而只需处理一个通过控制的事件并阻止处理其余事件即可解决重复事件问题。