在本节教程中,我们将学习如何在 Spring Boot Kafka 项目中创建 Kafka Producer 和 Consumer 。
Spring 团队 为 Apache Kafka 依赖项提供 Spring,以配合开发基于 Kafka 的消息传递解决方案。
在本教程中,我们使用 Kafka 作为消息系统在生产者和消费者之间发送消息。
1. 安装和设置 Apache Kafka
从官网下载Kafka https://kafka.apache.org/downloads
解压本地文件系统中的Kafka zip
运行以下命令以按正确顺序启动所有服务:
使用以下命令启动 Zookeeper 服务:
# Start the ZooKeeper service # Note: Soon, ZooKeeper will no longer be required by Apache Kafka. $ bin/zookeeper-server-start.sh config/zookeeper.properties
|
启动Kafka Broker
打开另一个终端会话并运行以下命令以启动 Kafka 代理:
# Start the Kafka broker service $ bin/kafka-server-start.sh config/server.properties
|
一旦所有服务都成功启动,您将拥有一个正在运行并可以使用的基本 Kafka 环境。2. 在 IntelliJ 中创建和设置 Spring Boot 项目
添加依赖项:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
在 IntelliJ 中导入并运行 spring boot 应用程序3.在application.properties文件中配置Kafka Producer和Consumer
在 application.properties 文件中添加Kafka broker地址以及Consumer和Producer相关配置。
打开 application.properties 文件及其以下内容:
spring.kafka.consumer.bootstrap-servers: localhost:9092 spring.kafka.consumer.group-id: group-id spring.kafka.consumer.auto-offset-reset: earliest spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.bootstrap-servers: localhost:9092 spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
|
下面我们来了解一下上面spring boot提供的Kafka属性:
spring.kafka.consumer.group-id - 指定一个唯一的字符串,用于标识该消费者所属的消费者组。
spring.kafka.consumer.auto-offset-reset 属性 - 指定当 Kafka 中没有初始偏移量或者当前偏移量在服务器上不再存在时要做什么(例如,因为该数据已被删除):
- earliest:自动将偏移量重置为最早的偏移量
- latest:自动将偏移量重置为最新的偏移量
- none:如果没有找到消费者组的先前偏移量,则向消费者抛出异常
- 其他:向消费者抛出异常。
spring.kafka.consumer.key-deserializer - 指定键的反序列化器类。
spring.kafka.consumer.value-deserializer - 指定值的反序列化器类。
spring.kafka.producer.key-deserializer - 指定键的序列化器类。
spring.kafka.producer.value-deserializer - 指定值的序列化器类。
4.创建Kafka主题
要在启动时创建主题,请添加一个 NewTopic类型的 bean 。如果主题已经存在,则忽略该 bean。在此示例中,我们将使用主题名称“javaguides”。
让我们创建一个 KafkaTopicConfig 文件并添加以下内容:
package net.javaguides.springbootkafka;
import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.TopicBuilder;
@Configuration public class KafkaTopicConfig {
@Bean public NewTopic javaguidesTopic(){ return TopicBuilder.name("javaguides") .build(); } }
|
5.创建Kafka生产者
创建生产者将就该主题编写我们的消息。
KafkaTemplate
好吧,Spring boot 为 Spring 的KafkaTemplate提供了自动配置 ,因此您可以直接在自己的 bean 中自动装配它。
例如:
package net.javaguides.springbootkafka.kafka;
import net.javaguides.springbootkafka.utils.AppConstants; import org.apache.kafka.clients.admin.NewTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service;
@Service public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message){ LOGGER.info(String.format("Message sent -> %s", message)); kafkaTemplate.send(AppConstants.TOPIC_NAME, message); } }
|
创建一个utils包,并在该包中创建 具有以下内容的AppConstants :package net.javaguides.springbootkafka.utils;
public class AppConstants { public static final String TOPIC_NAME = "javaguides"; public static final String GROUP_ID = "group_id"; }
|
KafKaProducer 类使用 KafkaTemplate 将消息发送到配置的主题名称。6. 创建 REST API 发送消息
创建控制器包,在控制器包中创建 KafkaProducerController ,其中包含以下内容:
package net.javaguides.springbootkafka;
import net.javaguides.springbootkafka.kafka.KafkaProducer; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*;
@RestController @RequestMapping("/api/v1/kafka") public class KafkaProducerController {
private KafkaProducer kafkaProducer;
public KafkaProducerController(KafkaProducer kafkaProducer) { this.kafkaProducer = kafkaProducer; }
@GetMapping("/publish") public ResponseEntity<String> publish(@RequestParam("message") String message){ kafkaProducer.sendMessage(message); return ResponseEntity.ok("Message sent to kafka topic"); } }
|
通过命令行查看主题消息:
bin/kafka-console-consumer.sh --topic javaguides --from-beginning --bootstrap-server localhost:9092
|
确保更改主题名称。在我们的例子中,“javaguides”是主题名称。7.创建Kafka消费者
Kafka Consumer 是负责读取消息并根据您自己的业务逻辑需要进行处理的服务。
要设置它,请输入以下内容:
package net.javaguides.springbootkafka.kafka;
import net.javaguides.springbootkafka.utils.AppConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service;
@Service public class KafKaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);
@KafkaListener(topics = AppConstants.TOPIC_NAME, groupId = AppConstants.GROUP_ID) public void consume(String message){ LOGGER.info(String.format("Message received -> %s", message)); } }
|
在这里,我们告诉我们的方法 void to consume (String message) 订阅用户的主题并将每条消息发送到应用程序日志。在您的实际应用程序中,您可以按照业务要求的方式处理消息。KafkaListener 端点:
@KafkaListener(topics = AppConstants.TOPIC_NAME, groupId = AppConstants.GROUP_ID) public void consume(String message){ LOGGER.info(String.format("Message received -> %s", message)); }
|
8.演示
让我们运行 Spring boot 应用程序并进行演示。确保 Zookeeper 和 Kafka 服务应该已启动并正在运行。
http://localhost:8080/api/v1/kafka/publish?message=hello%20world
源码: Github