Kafka中手工提交偏移量的4种方法

在Kafka中,消费者从分区读取消息。在读取消息时,需要考虑一些问题,例如确定从分区中读取哪些消息,或者防止重复读取消息或在发生故障时丢失消息。解决这些问题的方法是使用偏移量。

在本教程中,我们将了解 Kafka 中的偏移量。我们将了解如何提交偏移量来管理消息消耗并讨论其方法和缺点。

什么是偏移量?
我们知道Kafka将消息存储在主题中,每个主题可以有多个分区。每个消费者从主题的一个分区读取消息。在这里,Kafka 在偏移量的帮助下跟踪消费者阅读的消息。偏移量是从零开始的整数,随着消息的存储而增加 1。

假设一个消费者从一个分区读取了 5 条消息。然后,根据配置,Kafka 将直到4 的偏移量标记为已提交(从零开始的序列)。消费者下次尝试读取消息时会消费偏移量为5 的消息。

如果没有偏移,就无法避免重复处理或数据丢失。这就是为什么它如此重要。

我们可以用数据库存储来类比。在数据库中,我们在执行 SQL 语句后提交以保存更改。同样,从分区读取后,我们提交偏移量来标记已处理消息的位置。

有四种方法可以提交偏移量。我们将详细研究每一个并讨论它们的用例、优点和缺点。

 让我们首先在 pom.xml中添加 Kafka Client API依赖项:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.1</version>
</dependency>

1. 自动提交
这是提交偏移量的最简单方法。默认情况下,Kafka 使用自动提交——每五秒提交一次 poll()方法返回的最大偏移量。poll()返回一组超时时间为10秒的消息,我们可以在代码中看到:

KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<Long, String> message : messages) {
  // processed message
}

自动提交的问题是,如果应用程序发生故障,数据丢失的可能性非常高。当poll ()返回消息时,Kafka可能会在处理消息之前提交最大的偏移量。

假设poll() 返回 100 条消息,而消费者在自动提交发生时处理 60 条消息。然后,由于某些故障,消费者崩溃了。当新的消费者上线读取消息时,它从偏移量 101 开始读取,导致 61 到 100 之间的消息丢失。

因此,我们需要其他不存在此缺点的方法。答案是手动提交。

2. 手动同步提交
在手动提交中,无论是同步还是异步,都需要通过将默认属性( enabled.auto.commit属性)设置为false来禁用自动提交:

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

禁用手动提交后,现在让我们了解commitSync()的用法:

KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
  //process the messages
consumer.commitSync();

此方法通过仅在处理消息后提交偏移量来防止数据丢失。但是,当消费者在提交偏移量之前崩溃时,它并不能防止重复读取。除此之外,它还会影响应用程序性能。

commitSync ()会阻塞代码直到完成。此外,如果出现错误,它会继续重试。这会降低应用程序的吞吐量,这是我们不希望的。因此,Kafka 提供了另一种解决方案,即异步提交,来解决这些缺点。

3. 手动异步提交
Kafka 提供commitAsync() 来异步提交偏移量。它通过在不同线程中提交偏移量来克服手动同步提交的性能开销。让我们实现一个异步提交来理解这一点:

KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props); 
consumer.subscribe(KafkaConfigProperties.getTopic()); 
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
  //process the messages
consumer.commitAsync();

异步提交的问题是它在失败时不会重试。它依赖于下一次调用commitAsync(),这将提交最新的偏移量。

假设 300 是我们想要提交的最大偏移量,但由于某些问题,我们的commitAsync()失败了。在重试之前, commitAsync()的另一个调用可能会提交最大偏移量 400,因为它是异步的。当commitAsync()重试失败时,如果成功提交偏移量 300,它将覆盖之前提交的 400,从而导致重复读取。这就是commitAsync()不重试的原因。

4. 提交特定偏移量
有时,我们需要对偏移量进行更多控制。假设我们正在小批量处理消息,并希望在处理消息后立即提交偏移量。我们可以使用commitSync()和commitAsync()的重载方法,它接受一个映射参数来提交特定的偏移量:

KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int messageProcessed = 0;
  while (true) {
    ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
    for (ConsumerRecord<Long, String> message : messages) {
        // processed one message
      messageProcessed++;
      currentOffsets.put(
          new TopicPartition(message.topic(), message.partition()),
          new OffsetAndMetadata(message.offset() + 1));
      if (messageProcessed%50==0){
        consumer.commitSync(currentOffsets);
      }
    }
  }

在此代码中,我们管理一个 currentOffsets 映射,它以TopicPartition为键,以OffsetAndMetadata为值。我们在消息处理过程中将已处理消息的TopicPartition和OffsetAndMetadata插入到currentOffsets 映射中。当处理的消息数量达到 50 时,我们使用currentOffsets映射调用commitSync()将这些消息标记为已提交。

这种方式的行为与同步和异步提交相同。唯一的区别是,我们在这里决定要提交的偏移量,而不是 Kafka。