Spring Boot Kafka 生产者和消费者示例


在本节教程中,我们将学习如何在 Spring Boot Kafka 项目中创建 Kafka Producer 和 Consumer  。
Spring 团队 为 Apache Kafka 依赖项提供 Spring,以配合开发基于 Kafka 的消息传递解决方案。 
在本教程中,我们使用 Kafka 作为消息系统在生产者和消费者之间发送消息。

1. 安装和设置 Apache Kafka
从官网下载Kafka  https://kafka.apache.org/downloads
解压本地文件系统中的Kafka zip
运行以下命令以按正确顺序启动所有服务:

使用以下命令启动 Zookeeper 服务:

# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

启动Kafka Broker

打开另一个终端会话并运行以下命令以启动 Kafka 代理:

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

一旦所有服务都成功启动,您将拥有一个正在运行并可以使用的基本 Kafka 环境。

2. 在 IntelliJ 中创建和设置 Spring Boot 项目
添加依赖项:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
</dependency>
   
 <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
</dependency>

在 IntelliJ 中导入并运行 spring boot 应用程序

3.在application.properties文件中配置Kafka Producer和Consumer
在 application.properties 文件中添加Kafka broker地址以及Consumer和Producer相关配置。

打开 application.properties 文件及其以下内容:

spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer

下面我们来了解一下上面spring boot提供的Kafka属性:

spring.kafka.consumer.group-id  - 指定一个唯一的字符串,用于标识该消费者所属的消费者组。

spring.kafka.consumer.auto-offset-reset 属性 - 指定当 Kafka 中没有初始偏移量或者当前偏移量在服务器上不再存在时要做什么(例如,因为该数据已被删除):

  • earliest:自动将偏移量重置为最早的偏移量
  • latest:自动将偏移量重置为最新的偏移量
  • none:如果没有找到消费者组的先前偏移量,则向消费者抛出异常
  • 其他:向消费者抛出异常。

spring.kafka.consumer.key-deserializer  - 指定键的反序列化器类。

spring.kafka.consumer.value-deserializer  - 指定值的反序列化器类。

spring.kafka.producer.key-deserializer  - 指定键的序列化器类。

spring.kafka.producer.value-deserializer  - 指定值的序列化器类。

4.创建Kafka主题
要在启动时创建主题,请添加一个 NewTopic类型的 bean 。如果主题已经存在,则忽略该 bean。在此示例中,我们将使用主题名称“javaguides”。

让我们创建一个 KafkaTopicConfig 文件并添加以下内容:

package net.javaguides.springbootkafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic javaguidesTopic(){
        return TopicBuilder.name("javaguides")
                .build();
    }
}

5.创建Kafka生产者
创建生产者将就该主题编写我们的消息。

KafkaTemplate
好吧,Spring boot 为 Spring 的KafkaTemplate提供了自动配置  ,因此您可以直接在自己的 bean 中自动装配它。

例如:

package net.javaguides.springbootkafka.kafka;

import net.javaguides.springbootkafka.utils.AppConstants;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message){
        LOGGER.info(String.format("Message sent -> %s", message));
        kafkaTemplate.send(AppConstants.TOPIC_NAME, message);
    }
}

创建一个utils包,并在该包中创建  具有以下内容的AppConstants :

package net.javaguides.springbootkafka.utils;

public class AppConstants {
    public static final String TOPIC_NAME = "javaguides";
    public static final String GROUP_ID =
"group_id";
}

KafKaProducer 类使用 KafkaTemplate 将消息发送到配置的主题名称。

6. 创建 REST API 发送消息
创建控制器包,在控制器包中创建 KafkaProducerController  ,其中包含以下内容:

package net.javaguides.springbootkafka;

import net.javaguides.springbootkafka.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {

    private KafkaProducer kafkaProducer;

    public KafkaProducerController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @GetMapping(
"/publish")
    public ResponseEntity<String> publish(@RequestParam(
"message") String message){
        kafkaProducer.sendMessage(message);
        return ResponseEntity.ok(
"Message sent to kafka topic");
    }
}

通过命令行查看主题消息:
bin/kafka-console-consumer.sh --topic javaguides --from-beginning --bootstrap-server localhost:9092

确保更改主题名称。在我们的例子中,“javaguides”是主题名称。

7.创建Kafka消费者
Kafka Consumer 是负责读取消息并根据您自己的业务逻辑需要进行处理的服务。 

要设置它,请输入以下内容:

package net.javaguides.springbootkafka.kafka;

import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafKaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);

    @KafkaListener(topics = AppConstants.TOPIC_NAME,
                    groupId = AppConstants.GROUP_ID)
    public void consume(String message){
        LOGGER.info(String.format("Message received -> %s", message));
    }
}

在这里,我们告诉我们的方法 void to consume (String message) 订阅用户的主题并将每条消息发送到应用程序日志。在您的实际应用程序中,您可以按照业务要求的方式处理消息。

KafkaListener 端点:

    @KafkaListener(topics = AppConstants.TOPIC_NAME,
                    groupId = AppConstants.GROUP_ID)
    public void consume(String message){
        LOGGER.info(String.format("Message received -> %s", message));
    }

8.演示
让我们运行 Spring boot 应用程序并进行演示。确保 Zookeeper 和 Kafka 服务应该已启动并正在运行。

http://localhost:8080/api/v1/kafka/publish?message=hello%20world

源码: Github