Kafka中实现同步通信教程

在本文中,我们探讨了如何使用 Apache Kafka 实现 Spring Boot 应用程序中两个组件之间的同步通信。
我们完成了必要的配置并模拟了通知调度系统。
通过使用ReplyingKafkaTemplate,我们可以将 Apache Kafka 的异步特性转换为同步请求-回复模式。这种方法有点不合常规,因此在生产中实施之前,仔细评估它是否与项目的架构一致非常重要。

Apache Kafka已成为构建事件驱动架构的最流行和广泛使用的消息传递系统之一,其中一个微服务向某个主题发布消息,另一个微服务则异步使用和处理该消息。

但是,有些情况下,发布者微服务需要立即响应才能继续进行进一步处理。虽然 Kafka 本质上是为异步通信而设计的,但它可以配置为通过单独的主题支持同步请求-回复通信。

在本教程中,我们将探讨如何使用 Apache Kafka 在 Spring Boot 应用程序中实现同步请求-答复通信。

设置项目
为了演示,我们将模拟一个通知调度系统。我们将创建一个 Spring Boot 应用程序,它既充当生产者,又充当消费者。

依赖项
让我们首先将Spring Kafka 依赖项添加到项目的pom.xml文件中:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.3.4</version>
</dependency>

这种依赖关系为我们提供了建立连接和与配置的 Kafka 实例交互所需的类。

定义请求-回复消息
接下来,让我们定义两个记录来表示我们的请求和回复消息:

record NotificationDispatchRequest(String emailId, String content) {
}
public record NotificationDispatchResponse(UUID notificationId) {
}

这里,NotificationDispatchRequest记录保存通知的emailId和内容,而NotificationDispatchResponse记录包含处理请求后生成的唯一notificationId 。

定义 Kafka 主题和配置属性
现在,让我们定义请求和回复 Kafka 主题。此外,我们将配置从消费者组件接收回复的超时时间。

我们将这些属性存储在项目的application.yaml文件中,并使用@ConfigurationProperties将值映射到 Java 记录,我们的配置和服务层可以引用该记录:

@Validated
@ConfigurationProperties(prefix = "com.baeldung.kafka.synchronous")
record SynchronousKafkaProperties(
    @NotBlank
    String requestTopic,
    @NotBlank
    String replyTopic,
    @NotNull @DurationMin(seconds = 10) @DurationMax(minutes = 2)
    Duration replyTimeout
) {
}

我们还添加了验证注释,以确保所有必需的属性都已正确配置。如果任何定义的验证失败,Spring ApplicationContext将无法启动。这使我们能够遵循快速失败原则。

下面是我们的application.yaml文件的片段,它定义了将自动映射到我们的SynchronousKafkaProperties记录的必需属性:

com:
  baeldung:
    kafka:
      synchronous:
        request-topic: notification-dispatch-request
        reply-topic: notification-dispatch-response
        reply-timeout: 30s

在这里,我们配置请求和回复 Kafka 主题名称以及三十秒的回复超时。

除了自定义属性之外,我们还在application.yaml文件中添加一些核心 Kafka 配置属性:

spring:
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      group-id: synchronous-kafka-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: com.baeldung.kafka.synchronous
    properties:
      allow:
        auto:
          create:
            topics: true

首先,为了允许我们的应用程序连接到配置的 Kafka 实例,我们使用环境变量配置其引导服务器 URL 。

接下来,我们为消费者和生产者配置键和值的序列化和反序列化属性。此外,对于我们的消费者,我们配置一个group-id并信任包含我们的请求-回复记录的包以进行 JSON 反序列化。

配置上述属性后,Spring Kafka 会自动为我们创建ConsumerFactory和ProducerFactory类型的 bean 。我们将在下一节中使用它们来定义其他 Kafka 配置 bean。

最后,我们启用主题自动创建功能,这样 Kafka 便会在主题不存在时自动创建主题。需要注意的是,我们仅为了演示而启用此属性 — 在生产应用程序中不应执行此操作。

定义 Kafka 配置 Bean
有了配置属性,让我们定义必要的 Kafka 配置 bean:

@Bean
KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer(
    ConsumerFactory<String, NotificationDispatchResponse> consumerFactory
) {
    String replyTopic = synchronousKafkaProperties.replyTopic();
    ContainerProperties containerProperties = new ContainerProperties(replyTopic);
    return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}

首先,我们注入ConsumerFactory实例,并将其与配置的replyTopic一起使用以创建KafkaMessageListenerContainer bean。此 bean 负责创建一个容器,用于轮询来自我们的回复主题的消息。

接下来,我们将定义在服务层中用于执行同步通信的核心 bean:

@Bean
ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate(
    ProducerFactory<String, NotificationDispatchRequest> producerFactory,
    KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer
) {
    Duration replyTimeout = synchronousKafkaProperties.replyTimeout();
    var replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
    replyingKafkaTemplate.setDefaultReplyTimeout(replyTimeout);
    return replyingKafkaTemplate;
}

使用ProducerFactory和先前定义的KafkaMessageListenerContainer bean,我们创建了一个ReplyingKafkaTemplate bean。此外,使用自动连接的synchronousKafkaProperties,我们配置了在application.yaml文件中定义的 reply-timeout,这将决定我们的服务在超时之前等待响应的时间。

这个ReplyingKafkaTemplate bean 管理请求和回复主题之间的交互,使得通过 Kafka 进行同步通信成为可能。

最后,让我们定义 bean 以使我们的监听器组件能够将响应发送回回复主题:

@Bean
KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate(ProducerFactory<String, NotificationDispatchResponse> producerFactory) {
    return new KafkaTemplate<>(producerFactory);
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, NotificationDispatchRequest>> kafkaListenerContainerFactory(
    ConsumerFactory<String, NotificationDispatchRequest> consumerFactory,
    KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate
) {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, NotificationDispatchRequest>();
    factory.setConsumerFactory(consumerFactory);
    factory.setReplyTemplate(kafkaTemplate);
    return factory;
}

首先,我们使用ProducerFactory bean 创建一个标准的KafkaTemplate bean 。

然后,我们将其与ConsumerFactory bean一起使用来定义KafkaListenerContainerFactory bean。此 bean 使我们的监听器组件能够在完成所需处理后从请求主题消费消息,并将消息发送回回复主题。

使用Kafka实现同步通信
配置完成后,让我们在两个配置的 Kafka 主题之间实现同步请求-答复通信。

1. 使用ReplyingKafkaTemplate发送和接收消息
首先,让我们创建一个NotificationDispatchService类,该类使用我们之前定义的ReplyingKafkaTemplate bean将消息发送到配置的请求主题:

@Service
@EnableConfigurationProperties(SynchronousKafkaProperties.class)
class NotificationDispatchService {
    private final SynchronousKafkaProperties synchronousKafkaProperties;
    private final ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate;
    // standard constructor
    NotificationDispatchResponse dispatch(NotificationDispatchRequest notificationDispatchRequest) {
        String requestTopic = synchronousKafkaProperties.requestTopic();
        ProducerRecord<String, NotificationDispatchRequest> producerRecord = new ProducerRecord<>(requestTopic, notificationDispatchRequest);
        var requestReplyFuture = replyingKafkaTemplate.sendAndReceive(producerRecord);
        return requestReplyFuture.get().value();
    }
}

在这里,在我们的dispatch()方法中,我们使用自动连接的synchronousKafkaProperties实例来提取在application.yaml文件中配置的requestTopic 。然后,我们将它与方法参数中传递的notificationDispatchRequest一起使用来创建ProducerRecord实例。

接下来,我们将创建的ProducerRecord实例传递给sendAndReceive()方法,以将消息发布到请求主题。该方法返回一个RequestReplyFuture对象,我们使用它等待响应,然后返回其值。

在底层,当我们调用sendAndReceive()方法时,ReplyingKafkaTemplate类会生成一个唯一的关联 ID(随机 UUID),并将其附加到传出消息的 header 中。此外,它还会添加一个 header,其中包含它期望响应返回的回复主题名称。请记住,我们已经在KafkaMessageListenerContainer bean 中配置了回复主题。

ReplyingKafkaTemplate bean 使用生成的关联 ID 作为键,将 RequestReplyFuture 对象存储在线程安全的ConcurrentHashMap中。这使它即使在多线程环境中也能工作并支持并发请求。

.2. 定义 Kafka 消息监听器
接下来,为了完成我们的实现,让我们创建一个监听器组件,用于监听配置的请求主题中的消息并将响应发送回回复主题:

@Component
class NotificationDispatchListener {
    @SendTo
    @KafkaListener(topics = "${com.baeldung.kafka.synchronous.request-topic}")
    NotificationDispatchResponse listen(NotificationDispatchRequest notificationDispatchRequest) {
       
// ... processing logic
        UUID notificationId = UUID.randomUUID();
        return new NotificationDispatchResponse(notificationId);
    }
}

我们使用@KafkaListener注释来监听我们在application.yaml文件中配置的请求主题。

在我们的listen()方法中,我们仅返回一个包含唯一notificationId的NotificationDispatchResponse记录。

重要的是,我们用@SendTo注释注释我们的方法,它指示 Spring Kafka 从消息头中提取关联 ID 和回复主题名称。它使用它们自动将方法的返回值发送到提取的回复主题,并将相同的关联 ID 添加到消息头。

这允许我们的NotificationDispatchService类中的ReplyingKafkaTemplate bean使用它最初生成的关联 ID获取正确的RequestReplyFuture对象。