Apache Kafka 是一个流行的分布式事件流平台,与Project Reactor结合使用时,它可以构建弹性和反应性应用程序。Reactor Kafka是一个基于 Reactor 和Kafka生产者/消费者 API 构建的反应性 API。
Reactor Kafka API 使我们能够使用具有背压支持的函数性、非阻塞 API 将消息发布到 Kafka 并从 Kafka 消费消息。这意味着系统可以根据需求和资源可用性动态调整消息处理速率,从而确保高效且容错的运行。
在本教程中,我们将探索如何使用Reactor Kafka创建 Kafka 消费者,以确保容错性和可靠性。我们将深入探讨背压、重试和错误处理等关键概念,同时以非阻塞方式异步处理消息。
设置项目
首先,我们应该在项目中包含Spring Kafka和Reactor Kafka Maven 依赖项:
<dependency> |
反应式 Kafka 消费者设置
接下来,我们将使用 Reactor Kafka 设置 Kafka 消费者。我们将首先配置必要的消费者属性,确保它已正确设置以连接到 Kafka。然后,我们将初始化消费者,最后了解如何被动地消费消息。
1. 配置 Kafka 消费者属性
现在,让我们配置 Reactive Kafka 消费者属性。KafkaConfig配置类定义了消费者要使用的属性:
public class KafkaConfig { |
ConsumerConfig.GROUP_ID_CONFIG定义消费者组,它启用跨消费者的消息负载平衡。同一组中的所有消费者负责处理来自主题的消息。
接下来我们在实例化ReactiveKafkaConsumerTemplate时使用配置类来消费事件:
public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate() { |
该receiverOptions()方法使用consumerConfig()中的设置配置Kafka消费者并订阅test-topic,以确保它监听消息。该reactiveKafkaConsumerTemplate()方法初始化一个ReactiveKafkaConsumerTemplate,为我们的反应式应用程序启用非阻塞、背压感知的消息消费。
2. 使用 Reactive Kafka 创建 Kafka 消费者
在 Reactor Kafka 中,Kafka Consumer 上选择的抽象是入站Flux,其中从 Kafka 收到的所有事件都由框架发布。通过调用 ReactiveKafkaConsumerTemplate 上的accept()、receiveAtmostOnce()、receiveAutoAck()和acceptExactlyOnce()方法之一来创建 此Flux 。
在这个例子中,我们使用receive()操作符来使用入站的Flux:
public class ConsumerService { |
这种方法允许系统在消息到达时以响应方式处理消息,而不会阻塞或丢失消息。通过使用响应式流,消费者可以按照自己的节奏扩展和处理消息,并在必要时施加背压。在这里,我们通过doOnNext()记录收到的每条消息,并使用doOnError() 记录错误。
处理背压
使用 Reactor Kafka 消费者的主要优势之一是它支持背压。这可确保系统不会因高吞吐量而超负荷。我们可以使用limitRate()限制处理速率或使用buffer()进行批处理,而不是直接使用消息:
public Flux<String> consumeWithLimit() { |
在这里,我们一次最多请求两条消息,控制流量。这种方法可确保高效且可感知背压的消息处理。最后,它仅提取并返回消息值。
我们不需要单独处理它们,而是可以通过缓冲固定数量的记录,然后将它们作为一个组发出,从而批量使用它们:
public Flux<String> consumeAsABatch() { |
在这里,我们最多缓冲两条记录,然后将它们作为一批发送。通过使用buffer(2),它将消息分组并一起处理,从而减少了单独处理的开销。
错误处理策略
在反应式 Kafka 消费者中,管道中的错误充当终止信号。这会导致消费者关闭,从而使服务实例继续运行而不会消费事件。Reactor Kafka 提供了各种策略来解决这个问题,例如使用retryWhen运算符的重试机制。这会捕获故障、重新订阅上游发布者并重新创建 Kafka 消费者。
Kafka 消费者的另一个常见问题是反序列化错误,当消费者由于意外格式而无法反序列化消息时会发生这种情况。为了处理所谓的错误,我们可以使用Spring Kafka 提供的ErrorHandlingDeserializer 。
1. 重试策略
当我们想要重试失败的操作时,重试策略是必不可少的。此策略可确保以固定延迟(例如五秒)持续重试,直到消费者成功重新连接或满足预定义的退出条件。
让我们为消费者实现一个重试策略,以便当发生错误时它可以自动重试消息处理:
public Flux<String> consumeWithRetryWithBackOff(AtomicInteger attempts) { |
在此示例中,Retry.backoff(3, Duration.ofSeconds(1))指定系统尝试重试最多3次,退避时间为1秒。
2. 使用ErrorHandlingDeserializer处理序列化错误
从 Kafka 消费消息时,如果消息格式与预期架构不匹配,我们将遇到反序列化错误。为了解决这个问题,我们可以使用 Spring Kafka 的ErrorHandlingDeserializer。这可以通过捕获反序列化错误来防止消费者失败。然后它将错误详细信息作为标头添加到 ReceiverRecord ,而不是丢弃消息或引发异常:
private Map<String, Object> errorHandlingConsumerConfig(){ |