Kafka + Reactor:背压、重试与错误处理

在本文中,我们探讨了如何使用 Reactor Kafka 创建 Kafka 消费者,重点介绍错误处理、重试和背压管理。这些技术使我们的 Kafka 消费者即使在发生故障的情况下也能保持容错和高效。

Apache Kafka 是一个流行的分布式事件流平台,与Project Reactor结合使用时,它可以构建弹性和反应性应用程序。Reactor Kafka是一个基于 Reactor 和Kafka生产者/消费者 API 构建的反应性 API。

Reactor Kafka API 使我们能够使用具有背压支持的函数性、非阻塞 API 将消息发布到 Kafka 并从 Kafka 消费消息。这意味着系统可以根据需求和资源可用性动态调整消息处理速率,从而确保高效且容错的运行。

在本教程中,我们将探索如何使用Reactor Kafka创建 Kafka 消费者,以确保容错性和可靠性。我们将深入探讨背压、重试和错误处理等关键概念,同时以非阻塞方式异步处理消息。

设置项目
首先,我们应该在项目中包含Spring Kafka和Reactor Kafka Maven 依赖项:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor.kafka</groupId>
    <artifactId>reactor-kafka</artifactId>
</dependency>

反应式 Kafka 消费者设置
接下来,我们将使用 Reactor Kafka 设置 Kafka 消费者。我们将首先配置必要的消费者属性,确保它已正确设置以连接到 Kafka。然后,我们将初始化消费者,最后了解如何被动地消费消息。

1. 配置 Kafka 消费者属性
现在,让我们配置 Reactive Kafka 消费者属性。KafkaConfig配置类定义了消费者要使用的属性:

public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    public static Map<String, Object> consumerConfig() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG,
"reactive-consumer-group");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return config;
    }
}

ConsumerConfig.GROUP_ID_CONFIG定义消费者组,它启用跨消费者的消息负载平衡。同一组中的所有消费者负责处理来自主题的消息。

接下来我们在实例化ReactiveKafkaConsumerTemplate时使用配置类来消费事件:

public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate() {
    return new ReactiveKafkaConsumerTemplate<>(receiverOptions());
}
private ReceiverOptions<String, String> receiverOptions() {
    Map<String, Object> consumerConfig = consumerConfig();
    ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(consumerConfig);
        return receiverOptions.subscription(Collections.singletonList("test-topic"));
}

该receiverOptions()方法使用consumerConfig()中的设置配置Kafka消费者并订阅test-topic,以确保它监听消息。该reactiveKafkaConsumerTemplate()方法初始化一个ReactiveKafkaConsumerTemplate,为我们的反应式应用程序启用非阻塞、背压感知的消息消费。

2. 使用 Reactive Kafka 创建 Kafka 消费者
在 Reactor Kafka 中,Kafka Consumer 上选择的抽象是入站Flux,其中从 Kafka 收到的所有事件都由框架发布。通过调用 ReactiveKafkaConsumerTemplate 上的accept()、receiveAtmostOnce()、receiveAutoAck()和acceptExactlyOnce()方法之一来创建 此Flux 。

在这个例子中,我们使用receive()操作符来使用入站的Flux:

public class ConsumerService {
    private final ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;
    public Flux<String> consumeRecord() {
        return reactiveKafkaConsumerTemplate.receive()
          .map(ReceiverRecord::value)
          .doOnNext(msg -> log.info("Received: {}", msg))
          .doOnError(error -> log.error(
"Consumer error: {}", error.getMessage()));
    }
}

这种方法允许系统在消息到达时以响应方式处理消息,而不会阻塞或丢失消息。通过使用响应式流,消费者可以按照自己的节奏扩展和处理消息,并在必要时施加背压。在这里,我们通过doOnNext()记录收到的每条消息,并使用doOnError() 记录错误。

处理背压
使用 Reactor Kafka 消费者的主要优势之一是它支持背压。这可确保系统不会因高吞吐量而超负荷。我们可以使用limitRate()限制处理速率或使用buffer()进行批处理,而不是直接使用消息:

public Flux<String> consumeWithLimit() {
    return reactiveKafkaConsumerTemplate.receive()
      .limitRate(2)
      .map(ReceiverRecord::value);
}

在这里,我们一次最多请求两条消息,控制流量。这种方法可确保高效且可感知背压的消息处理。最后,它仅提取并返回消息值。

我们不需要单独处理它们,而是可以通过缓冲固定数量的记录,然后将它们作为一个组发出,从而批量使用它们:

public Flux<String> consumeAsABatch() {
    return reactiveKafkaConsumerTemplate.receive()
      .buffer(2)
      .flatMap(messages -> Flux.fromStream(messages.stream()
        .map(ReceiverRecord::value)));
}

在这里,我们最多缓冲两条记录,然后将它们作为一批发送。通过使用buffer(2),它将消息分组并一起处理,从而减少了单独处理的开销。

错误处理策略
在反应式 Kafka 消费者中,管道中的错误充当终止信号。这会导致消费者关闭,从而使服务实例继续运行而不会消费事件。Reactor Kafka 提供了各种策略来解决这个问题,例如使用retryWhen运算符的重试机制。这会捕获故障、重新订阅上游发布者并重新创建 Kafka 消费者。

Kafka 消费者的另一个常见问题是反序列化错误,当消费者由于意外格式而无法反序列化消息时会发生这种情况。为了处理所谓的错误,我们可以使用Spring Kafka 提供的ErrorHandlingDeserializer 。

1. 重试策略
当我们想要重试失败的操作时,重试策略是必不可少的。此策略可确保以固定延迟(例如五秒)持续重试,直到消费者成功重新连接或满足预定义的退出条件。

让我们为消费者实现一个重试策略,以便当发生错误时它可以自动重试消息处理:

public Flux<String> consumeWithRetryWithBackOff(AtomicInteger attempts) {
    return reactiveKafkaConsumerTemplate.receive()
      .flatMap(msg -> attempts.incrementAndGet() < 3 ? 
        Flux.error(new RuntimeException("Failure")) : Flux.just(msg))
      .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1)))
      .map(ReceiverRecord::value);
}

在此示例中,Retry.backoff(3, Duration.ofSeconds(1))指定系统尝试重试最多3次,退避时间为1秒。

2. 使用ErrorHandlingDeserializer处理序列化错误
从 Kafka 消费消息时,如果消息格式与预期架构不匹配,我们将遇到反序列化错误。为了解决这个问题,我们可以使用 Spring Kafka 的ErrorHandlingDeserializer。这可以通过捕获反序列化错误来防止消费者失败。然后它将错误详细信息作为标头添加到 ReceiverRecord ,而不是丢弃消息或引发异常:

private Map<String, Object> errorHandlingConsumerConfig(){
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
    config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
    return config;
}