Kafka跨分区传递消息如何保证严格的顺序

banq

在本文中,我们探讨了 Kafka 基于分区的架构如何在处理消息传递的同时,在最重要的位置保持排序保证。我们已经看到,Kafka 优先考虑可扩展性和吞吐量,而不是全局排序,从而提供了符合大多数实际需求的分区级保证。关键在于理解分区是 Kafka 中并行性和排序的单位。通过围绕这些约束设计应用程序,我们可以构建可扩展的系统,高效处理数百万条消息。

Apache Kafka是一个分布式流处理平台,它通过基于分区的架构处理高吞吐量数据。当我们向 Kafka 主题发送消息时,它们会分布在多个分区上进行并行处理。这种设计使 Kafka 能够在保持性能的同时进行水平扩展。

了解 Kafka 如何跨分区传递消息对于构建可靠的分布式系统至关重要。分区策略会影响消息排序、消费者扩展以及整体系统行为。

在本文中,我们将探讨当主题包含多个分区时 Kafka 如何传递消息,重点关注路由策略、排序保证和消费者协调。

消息路由到分区
Kafka 使用两种主要策略来根据消息是否包含键来确定哪个分区接收消息。这一决策从根本上影响了消息的分发和处理方式。

1. 基于键的分区
当我们发送带有键的消息时,Kafka 会使用确定性哈希函数(很可能是 Murmur2 哈希)将其一致地路由到同一分区。这确保了相关消息保持在一起:

public void sendMessagesWithKey() {
    String key = "user-123";
    for (int i = 0; i <= 5; i++) {
        ProducerRecord<String, String> record = new ProducerRecord<>(
"user-events", key, "Event " + i);
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                logger.info(
"Key: {}, Partition: {}, Offset: {}", key, metadata.partition(),
                  metadata.offset());
            }
        });
    }
    producer.flush();
}

Kafka 对键应用 MurmurHash2 算法,并使用分区计数进行模运算来选择目标分区。所有带有键“user-123”的消息将始终位于同一分区,从而确保它们按顺序处理。当我们需要维护特定实体的状态或顺序时,这尤其有用。

2. 无密钥消息的循环机制
没有密钥的消息使用粘性分区进行分发,这是一种通过有效地批处理消息来提高吞吐量的策略:

public Map<Integer, Integer> sendMessagesWithoutKey() {
    Map<Integer, Integer> partitionCounts = new HashMap<>();
    for (int i = 0; i < 100; i++) {
        ProducerRecord<String, String> record = new ProducerRecord<>("events", null,  // no key
           
"Message " + i);
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                synchronized (partitionCounts) {
                    partitionCounts.merge(metadata.partition(), 1, Integer::sum);
                }
            }
        });
    }
    producer.flush();
    logger.info(
"Distribution across partitions: {}", partitionCounts);
    return partitionCounts;
}

由于没有键,Kafka 会先将批次填充到一个分区,然后再移动到下一个分区。与纯循环分配相比,这减少了网络请求并提高了压缩率。粘性行为会持续到批次已满或延迟时间到期。

跨分区的排序保证
Kafka 的顺序保证完全依赖于分区结构。理解分区结构对于设计正确处理顺序操作的系统至关重要。

1. 分区排序
每个分区通过顺序偏移分配来保持严格的顺序。消息将附加到分区日志中,并按照该顺序进行使用:

public void demonstratePartitionOrdering() throws InterruptedException {
    String orderId = "order-789";
    String[] events = {
"created", "validated", "paid", "shipped", "delivered" };
    for (String event : events) {
        ProducerRecord<String, String> record = new ProducerRecord<>(
"orders", orderId, event);
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                logger.info(
"Event: {} -> Partition: {}, Offset: {}", event, metadata.partition(),
                  metadata.offset());
            }
        });
       
// small delay to demonstrate sequential processing
        Thread.sleep(100);
    }
    producer.flush();
}

由于所有消息共享相同的密钥,它们会路由到同一个分区,并在消费过程中保持其顺序。此保证在同一个分区内是绝对的:消费者将始终按照事件的生成顺序读取它们。

2. 无跨分区排序
具有不同键的消息可能会落在不同的分区中,并且 Kafka 不提供跨分区的排序保证:

public void demonstrateCrossPartitionBehavior() {
    long startTime = System.currentTimeMillis();
    // these will likely go to different partitions
    producer.send(new ProducerRecord<>(
"events", "key-A", "First at " + (System.currentTimeMillis() - startTime) + "ms"));
    producer.send(new ProducerRecord<>(
"events", "key-B", "Second at " + (System.currentTimeMillis() - startTime) + "ms"));
    producer.send(new ProducerRecord<>(
"events", "key-C", "Third at " + (System.currentTimeMillis() - startTime) + "ms"));
    producer.flush();
}

即使我们按顺序发送这些消息,由于它们位于不同的分区,消费者也可能无序地处理它们。由于消费者负载或网络状况,一个分区的处理速度可能比另一个分区更快。

消费者组协调
Kafka 通过在组中的消费者之间分配分区来实现水平扩展。这种协调是 Kafka 可扩展性模型的基础。

1. 组内分区分配
当多个消费者加入同一个组时,Kafka 会将每个分区分配给一个消费者,从而防止组内重复处理:

public void createConsumerGroup() {
    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put(
"group.id", "order-processors");
    props.put(
"key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(
"value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(
"auto.offset.reset", "earliest");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(List.of(
"orders"));
    int recordCount = 0;
    while (recordCount < 10) {
// process limited records for demo
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            logger.info(
"Consumer: {}, Partition: {}, Offset: {}, Value: {}", Thread.currentThread()
                .getName(), record.partition(), record.offset(), record.value());
            recordCount++;
        }
        consumer.commitSync();
    }
    consumer.close();
}

如果组中有六个分区和三个消费者,则每个消费者通常处理两个分区。这种分配方式可确保负载均衡,并且组内不会出现消息重复。Kafka 的组协调器会自动管理这些分配。

2. 多个组用于扇出
不同的消费者组可以独立处理相同的消息,从而使多个应用程序能够对相同的事件做出反应:

public void startMultipleGroups() {
    String[] groupIds = { "analytics-group", "audit-group", "notification-group" };
    CountDownLatch latch = new CountDownLatch(groupIds.length);
    for (String groupId : groupIds) {
        startConsumerGroup(groupId, latch);
    }
    try {
        latch.await(10, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        Thread.currentThread()
            .interrupt();
    }
}
private void startConsumerGroup(String groupId, CountDownLatch latch) {
    Properties props = new Properties();
    props.put(
"bootstrap.servers", bootstrapServers);
   
// other properties
    new Thread(() -> {
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Arrays.asList(
"orders"));
            int recordCount = 0;
            while (recordCount < 5) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                recordCount += processRecordsForGroup(groupId, records);
            }
        } finally {
            latch.countDown();
        }
    }).start();
}

每个组维护各自的偏移量跟踪,允许不同的服务按照各自的节奏处理消息。此模式支持事件驱动架构,其中多个系统可以对同一业务事件做出响应。

处理消费者重新平衡
当消费者加入或离开某个组时,Kafka 会重新平衡分区分配。此过程可确保持续运行,但可能会造成暂时中断。我们可以使用协作式重新平衡来最大程度地降低影响:

public void configureCooperativeRebalancing() {
    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put(
"group.id", "cooperative-group");
    props.put(
"partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
    props.put(
"key.deserializer", StringDeserializer.class.getName());
    props.put(
"value.deserializer", StringDeserializer.class.getName());
    props.put(
"auto.offset.reset", "earliest");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(
"orders"), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection partitions) {
            logger.info(
"Revoked partitions: {}", partitions);
           
// complete processing of current records
        }
        @Override
        public void onPartitionsAssigned(Collection partitions) {
            logger.info(
"Assigned partitions: {}", partitions);
           
// initialize any partition-specific state
        }
    });
   
// process a few records to demonstrate
    int recordCount = 0;
    while (recordCount < 5) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        recordCount += records.count();
    }
    consumer.close();
}

协作式重新平衡允许未受影响的消费者继续处理,同时仅重新分配必要的分区。这显著降低了扩展操作的影响。

传递保证
为了实现可靠的消息处理,我们通常通过手动控制偏移提交来实现至少一次传递:

public void processWithManualCommit() {
    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put(
"group.id", "manual-commit-group");
    props.put(
"enable.auto.commit", "false");
    props.put(
"max.poll.records", "10");
    props.put(
"key.deserializer", StringDeserializer.class.getName());
    props.put(
"value.deserializer", StringDeserializer.class.getName());
    props.put(
"auto.offset.reset", "earliest");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(
"orders"));
    int totalProcessed = 0;
    while (totalProcessed < 10) { 
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            try {
                processOrder(record);
                totalProcessed++;
            } catch (Exception e) {
                logger.error(
"Processing failed for offset: {}", record.offset(), e);
                break;
            }
        }
        if (!records.isEmpty()) {
            consumer.commitSync();
            logger.info(
"Committed {} records", records.count());
        }
    }
    consumer.close();
}

这种方法可以确保我们的消息在处理失败时不会丢失,尽管我们必须设计处理逻辑来优雅地处理潜在的重复消息。

整合
当生产者向 Kafka 发送消息时,该过程从分区选择开始。带键的消息使用一致性哈希算法来确保相关数据保持在同一分区中。无键消息使用粘性分区来提高批处理效率。在每个分区内,Kafka 会分配连续的偏移量以保持严格的顺序,但跨分区不存在全局排序。

每个分区分配给每个组的一个消费者,从而实现并行处理,避免重复。不同的消费者组独立消费相同的消息,每个组都有单独的偏移量跟踪。当消费者加入或离开时,Kafka 会使用协作策略重新平衡分区分配,以减少中断。这种设计使 Kafka 能够水平扩展,同时保持每个分区内的顺序。