在Spring Boot中创建 Kafka 主题

在本教程中,您将学习如何在 Spring Boot 应用程序中创建新的 Kafka 主题。当新消息发送到 Kafka 主题时,也可以自动创建主题。然而,在生产环境中,主题自动创建通常是关闭的。这就是为什么在向 Kafka 发送消息之前创建主题被认为是一个很好的做法。

创建配置类
在 Spring Boot 应用程序中创建新的 Kafka 主题的第一步是创建一个新的配置类。这将是一个带有@Configuration 注释的简单 Java 类。

import org.springframework.context.annotation.Configuration;
 
@Configuration
public class KafkaConfig {

 
}


创建 Kafka Topic
第二步是创建一个新的 Java 方法,该方法将实际创建一个新的 Kafka Topic。

import java.util.Map;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaConfig {

    @Bean
    NewTopic createTopic() {
        return TopicBuilder.name("product-created-topic")
                .partitions(3)
                .replicas(3)
                .configs(Map.of(
"min.insync.replicas", "2"))
                .build();
    }
}

让我逐条教你这个方法。

  • 注意到这个方法使用了 @Bean 注解吗?这将把本方法创建并返回的 NewTopic 对象添加到 Spring 应用程序上下文中。这将使 NewTopic 对其他应用程序服务和组件可用。
  • 该主题是使用 Spring Kafka 提供的 TopicBuilder 类创建的。使用 .name() 方法为主题命名为 "product-created-topic"。
  • .partitions(3) 方法将主题的分区数设置为 3。 在 Kafka 中,一个主题可以分为多个分区,从而实现并行处理和可扩展性。通过增加分区数,可以提高 Kafka 系统的吞吐量和并行性。如果我像这里一样创建了一个有 3 个分区的主题,我就可以启动多达 3 个微服务来消耗这个主题中的消息。
  • .replicas(3) 方法将每个分区的副本数量设置为 3。 在 Kafka 中,副本是存储在不同代理上的主题数据的副本。因此,如果我在这里配置 3 个副本,就意味着这个主题将有 3 个副本。这将提高数据的耐用性。但这也意味着我需要在 Kafka 集群中运行三个代理。如果你只在本地计算机上用一台 Kafka 服务器测试这段代码,那么就将副本数量设为 1。 这里还需要注意的是,增加副本数量也会增加存储需求和代理之间的网络流量。因此,这里最好有一个合理的数字。例如,如果你说这个数据非常重要,你真的想确保它永远不会丢失。那么我就把这个数字设为三千,而不是 3。这可能已经太多了。因此,在决定一个主题的副本数量时,最好在容错性和资源利用率之间找到一个平衡点。
  • .configs(Map.of("min.insync.replicas", "2")) 方法为主题设置附加配置选项。在本例中,它将 "min.insync.replicas "配置设置为 2,这意味着必须至少有 2 个副本确认写入才算成功。
    这个主题总共有 3 个副本。有了这个配置,我们就可以说,当我们向这个主题发送信息时,必须至少有 2 个副本确认数据已成功存储。如果确认写入成功的副本数量低于这个值,那么生产者就会收到异常。由于我们必须等待副本确认收到信息,因此写入主题的速度会稍慢一些。但这也保证了数据存储的可靠性。


检查 Kafka 主题是否存在
要检查 Kafka 主题是否创建成功,我可以使用 Kafka CLI。要做到这一点

打开一个新的终端窗口、
将目录更改为 Kafka 文件夹,然后
运行以下命令:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic product-created-topic

--describe 参数将帮助我们读取 Kafka 主题的详细信息。它将帮助我们读取分区数量、复制因子和其他配置细节。