Kafka 中使用 @KafkaListener 批量消费消息

在本教程中,我们将讨论如何使用 Spring Kafka 库的@KafkaListener注释批量处理 Kafka 消息。

Kafka代理是一个中间件,可帮助持久保存来自源系统的消息。目标系统配置为定期轮询 Kafka 主题/队列,然后从中读取消息。

这可以防止目标系统或服务停机时丢失消息。当目标服务恢复时,它们会继续接受未处理的消息。因此,这种架构有助于提高消息的持久性,从而提高系统的容错能力。

在本文中,我们讨论了基本 Kafka 侦听器和启用批处理的侦听器之间的区别。批处理有助于同时处理多条消息以提高应用程序性能。但是,对批处理量和消息大小进行适当的限制对于控制应用程序的性能非常重要。因此,必须在仔细而严格的基准测试过程之后对其进行优化。

为什么要批量处理消息?
多个源或事件生产者经常会同时向同一个 Kafka 队列或主题发送消息。因此,大量消息可能会堆积其中。如果目标服务或消费者在一次会话中收到这些大量消息,他们可能无法有效地处理这些消息。

这可能会产生连锁反应,从而导致瓶颈。最终,这会影响所有依赖消息的下游流程。因此,消费者或消息监听者应该限制他们在同一时间点可以处理的消息数量。

要以批处理模式运行,我们必须考虑主题上发布的数据量和应用程序的容量来配置正确的批处理大小。此外,消费者应用程序应设计为批量处理消息以满足 SLA。

此外,如果没有批处理,消费者必须定期轮询 Kafka 主题才能单独获取消息。这种方法会给计算资源带来压力。因此,批处理比每次轮询处理一条消息要高效得多。

然而, 批处理可能不适合某些情况:

  • 消息量较小
  • 立即处理对于时间敏感的应用至关重要
  • 计算和内存资源受到限制
  • 严格的消息排序至关重要

使用@KafkaListener注释进行批处理
为了理解批处理,我们首先要定义一个用例。然后,我们将首先使用基本消息处理来实现它,然后再使用批处理来实现它。这样,我们就可以更好地理解批量处理消息的重要性。

1. 用例描述
假设许多关键 IT 基础设施设备(例如服务器和网络设备)在公司的数据中心运行。多个监控工具跟踪这些设备的 KPI(关键绩效指标)。由于运营团队希望进行主动监控,他们希望获得实时可操作的分析。因此,存在严格的 SLA 来将 KPI 传输到目标分析应用程序。

运营团队配置监控工具,定期将 KPI 发送到 Kafka 主题。消费者应用程序从主题读取消息,然后将其推送到数据湖。应用程序从数据湖读取数据并生成实时分析。

让我们实现一个配置和未配置批处理的消费者。我们将分析这两种实现的区别和结果。

2. 先决条件
在开始实施批处理之前,了解 Spring Kafka 库至关重要。幸运的是,我们在文章《使用 Spring 的 Apache Kafka 简介》中讨论了这个主题,这为我们提供了急需的动力。

为了学习目的,我们需要一个 Kafka 实例。因此,为了快速入门,我们将使用嵌入式 Kafka。

最后,我们需要一个程序,在 Kafka 代理中创建一个事件队列,并定期向其发布示例消息。本质上,我们将使用Junit5来理解这个概念。

3. 基本监听器
让我们从一个基本的侦听器开始,该侦听器从 Kafka 代理逐个读取消息。我们将在KafkaKpiConsumerWithNoBatchConfig配置类中定义ConcurrentKafkaListenerContainerFactory bean :

public class KafkaKpiConsumerWithNoBatchConfig {
    @Bean(name = "kafkaKpiListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaKpiBasicListenerContainerFactory(
      ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory
          = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
}

kafkaKpiBasicListenerContainerFactory()方法返回kafkaKpiListenerContainerFactory bean。该 bean 有助于配置一个可以一次处理一条消息的基本侦听器:

@Component
public class KpiConsumer {
    private CountDownLatch latch = new CountDownLatch(1);
    private ConsumerRecord<String, String> message;
    @Autowired
    private DataLakeService dataLakeService;
    @KafkaListener(
      id = "kpi-listener",
      topics =
"kpi_topic",
      containerFactory =
"kafkaKpiListenerContainerFactory")
    public void listen(ConsumerRecord<String, String> record) throws InterruptedException {
        this.message = record;
        latch.await();
        List<String> messages = new ArrayList<>();
        messages.add(record.value());
        dataLakeService.save(messages);
       
//reset the latch
        latch = new CountDownLatch(1);
    }
   
//General getter methods
}

我们已经将@KafkaListener注释应用于listen()方法。该注释有助于设置侦听器主题和侦听器容器工厂 bean。KpiConsumer类中的java.util.concurrent.CountDownLatch对象有助于控制 Junit5 测试中的消息处理。我们将使用它来理解整个概念。

CountDownLatch #await()方法会暂停侦听器线程,当测试方法调用CountDownLatch#countDown()方法时,线程会恢复。如果没有这个,理解和跟踪消息会很困难。最后,下游DataLakeService#save()方法会收到一条消息进行处理。

现在让我们看一下帮助跟踪KpiListener类处理的消息的方法:

@RepeatedTest(10)
void givenKafka_whenMessage1OnTopic_thenListenerConsumesMessages(RepetitionInfo repetitionInfo) {
    String testNo = String.valueOf(repetitionInfo.getCurrentRepetition());
    assertThat(kpiConsumer.getMessage().value()).isEqualTo("Test KPI Message-".concat(testNo));
    kpiConsumer.getLatch().countDown();
}

当监控工具将 KPI 消息发布到kpi_topic Kafka 主题时,监听器会按照消息到达的顺序接收它们。

该方法每次执行时,都会跟踪到达KpiListener#listen()方法的消息,确认消息顺序后,释放闩锁,监听器完成处理。

4. 具有批处理功能的监听器
现在,让我们探索 Kafka 中的批处理支持。我们首先在 Spring 配置类中定义ConcurrentKafkaListenerContainerFactory bean:

@Bean(name="kafkaKpiListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaKpiBatchListenerContainerFactory(
  ConsumerFactory<String, String> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory
      = new ConcurrentKafkaListenerContainerFactory();
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
"20");
    consumerFactory.updateConfigs(configProps);
    factory.setConcurrency(1);
    factory.setConsumerFactory(consumerFactory);
    factory.getContainerProperties().setPollTimeout(3000);
    factory.setBatchListener(true);
    return factory;
}

该方法与上一节中定义的kafkaKpiBasicListenerContainerFactory()方法类似。我们通过调用ConsumerFactory#setBatchListener()方法启用了批处理。

此外,我们在ConsumerConfig.MAX_POLL_RECORDS_CONFIG属性的帮助下设置了每次轮询的最大消息数。ConsumerFactory #setConcurrency()有助于设置并发消费者线程的数量,同时处理消息。我们可以参考Spring Kafka 官方网站中的其他配置。

此外,还有诸如ConsumerConfig.DEFAULT_FETCH_MAX_BYTES和ConsumerConfig.DEFAULT_FETCH_MIN_BYTES 等配置属性 ,也可以帮助限制消息大小。

现在,我们来看看消费者:

@Component
public class KpiBatchConsumer {
    private CountDownLatch latch = new CountDownLatch(1);
    @Autowired
    private DataLakeService dataLakeService;
    private List<String> receivedMessages = new ArrayList<>();
    @KafkaListener(
      id = "kpi-batch-listener",
      topics =
"kpi_batch_topic",
      batch =
"true",
      containerFactory =
"kafkaKpiListenerContainerFactory")
    public void listen(ConsumerRecords<String, String> records) throws InterruptedException {        
        records.forEach(record -> receivedMessages.add(record.value()));
        latch.await();
        dataLakeService.save(receivedMessages);
        latch = new CountDownLatch(1);
    }
   
// Standard getter methods
}

KpiBatchConsumer与之前定义的KpiConsumer类类似,只是@KafkaListener注释具有额外的batch属性。listen()方法采用ConsumerRecords类型的参数,而不是ConsumerRecord。我们可以遍历ConsumerRecords对象以获取batch 中的所有ConsumerRecord元素。

侦听器还可以按消息到达的顺序处理批量接收的消息。但是,在 Kafka 中跨主题分区维护消息批次的顺序非常复杂。

这里ConsumerRecord表示发布到 Kafka 主题的消息。最终,我们使用更多消息调用DataLakeService#save()方法。最后, CountDownLatch类起着我们之前看到的相同作用。

假设有 100 条 KPI 消息被推送到kpi_batch_topic Kafka 主题中。现在我们可以检查监听器的运行情况:

@RepeatedTest(5)
void givenKafka_whenMessagesOnTopic_thenListenerConsumesMessages() {
    int messageSize = kpiBatchConsumer.getReceivedMessages().size();
    assertThat(messageSize % 20).isEqualTo(0);
    kpiBatchConsumer.getLatch().countDown();
}

与基本监听器逐个接收消息不同,这次监听器KpiBatchConsumer#listen()方法接收一批包含20 条KPI 消息。