Spring Boot事务发件箱模式

如果您正在构建微服务架构,或者您只需要从整体式(单体)架构发送电子邮件,则应该研究事务发件箱模式以确保服务之间的可靠通信。这篇博文介绍了几种使用 Spring Boot 实现此目的的方法。

什么是事务发件箱模式?
克里斯·理查森(Chris Richardson)撰写的《微服务模式》一书向我介绍了这个概念。

事务发件箱是一种确保两个系统同步的方法,无需在这些系统之间使用分布式事务。一个简单的例子是将客户的订单存储在数据库中,并发送电子邮件来确认订单。

如果我们简单地实现这一点,我们可以这样做:

@Component
@Transactional
public class CompleteOrder {
  private final OrderRepository orderRepository;
  private final MailSender mailSender;

  public CompleteOrder(OrderRepository orderRepository, MailSender mailSender) {
    this.orderRepository = orderRepository;
    this.mailSender = mailSender;
  }

  public void execute(CompleteOrderParameters parameters) {
    Order order = createOrder(parameters);
    this.orderRepository.save(order);
    this.mailSender.notifyOrderRegistered(order);
  }
}

该类CompleteOrder是存储订单并发送电子邮件的用例。但是,如果出现问题怎么办?如果邮件提供商出现故障,则邮件永远不会发送给客户。更糟糕的是,交易将回滚,用户会收到错误。邮件服务器不存在不是客户的错。我们应该在几分钟后当邮件服务器恢复正常运行时重试发送电子邮件。

使用事务发件箱模式,我们可以通过存储我们应该先执行某些外部操作(发送电子邮件、将消息放入队列等)的事实来避免此问题。然后,异步进程可以查看数据库以了解还需要发生什么,并且可以在有时间时执行这些操作。如果外部系统不可用,则可以稍后重试该任务,直到成功为止。

使用 Spring Integration
我们可以使用Spring Integration来实现发件箱模式。这可以通过设置一个集成流来实现,该集成流将电子邮件消息作为输入,并将其传递到 JDBC 支持的输出,并使用轮询处理程序发送邮件。

项目设置
作为示例,让我们在start.spring.io上创建一个 Spring Boot 项目,其配置如下:

项目:Maven
语言:Java
Spring Boot:3.3.0
Java:21
依赖项:

  • Spring Web
  • Spring Data JPA
  • Spring 集成
  • Docker Compose 支持
  • PostgreSQL 驱动程序
迁徙路线
在生成的中pom.xml,手动添加spring-integration-jdbc依赖项:

pom.xml
  <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jdbc</artifactId>
  </dependency>

Spring Integration 设置

首先,我们通过添加此配置来配置 Spring Integration 本身:

SpringIntegrationConfiguration.java
package com.wimdeblauwe.examples.transactional_outbox_spring_integration.infrastructure.integration;

import javax.sql.DataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider;

@Configuration
public class SpringIntegrationConfiguration {

  private static final String CONCURRENT_METADATA_STORE_PREFIX = "_spring_integration_";

  @Bean
  JdbcChannelMessageStore jdbcChannelMessageStore(
      DataSource dataSource) {
    JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
    jdbcChannelMessageStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
    jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(
        new PostgresChannelMessageStoreQueryProvider());
    return jdbcChannelMessageStore;
  }
}

这个 bean 将会把我们添加到发件箱 Spring Integration 通道的对象持久保存在数据库中。

为了创建适当的表,我们使用了 Flyway 脚本,您可以在 GitHub上查看。

接下来,我们定义邮件的集成流程:

MailConfiguration.java

import java.time.Duration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;

@Configuration
public class MailConfigration {

  @Bean
  public DirectChannel mailInput() {
    return new DirectChannel();
  }

  @Bean
  public QueueChannel mailOutbox(JdbcChannelMessageStore jdbcChannelMessageStore) {
    return MessageChannels.queue(jdbcChannelMessageStore, "mail-outbox").getObject();
  }

  @Bean
  public IntegrationFlow mailFlow(JdbcChannelMessageStore jdbcChannelMessageStore,
      MailSender mailSender) {
    return IntegrationFlow.from(mailInput())
        .channel(mailOutbox(jdbcChannelMessageStore))
        .handle(message -> {
          MailMessage mailMessage = (MailMessage) message.getPayload();
          mailSender.sendMail(mailMessage);
        }, e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(1))
            .transactional()))
        .get();
  }
}

该配置有3个bean:
  1. mailInputMailMessage:这是将接收要发送的输入通道。
  2. mailOutbox:这是消息路由到的通道,将使用JdbcChannelMessageStore我们在SpringIntegrationConfiguration类中配置的存储消息。
  3. mailFlowmailInput:这定义了从到 的实际流程,mailOutbox并添加了一个handle()实际发送电子邮件的方法。它mailOutput每秒轮询一次 以查看是否有邮件要发送。由于 ,transactional()消息将保留在 上,mailOutbox直到发送成功。
该配置类使用了2个尚未解释的类:MailMessage和MailSender。

该类MailMessage是包含发送电子邮件所需信息的记录:

MailMessage.java

import java.io.Serial;
import java.io.Serializable;

public record MailMessage(String subject, String body, String to) implements Serializable {

  @Serial
  private static final long serialVersionUID = 1L;
}

请注意我们需要如何创建类Serializable以便 Spring Integration 可以将其存储在数据库中。

这MailSender是一个可以根据您想要发送电子邮件的方式以多种方式实现的接口:

MailSender.java

package com.wimdeblauwe.examples.transactional_outbox_spring_integration.infrastructure.mail;

public interface MailSender {

  void sendMail(MailMessage mailMessage);
}

为了进行测试,我实现了一个不可靠的邮件发送器,它会随机记录或抛出异常。实际上,您可能会使用 Java Mail 连接到 SMTP 服务器,或者使用 SendGrid 或 Amazon SES 等服务发送电子邮件。

LoggingMailSender.java

import java.util.random.RandomGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class LoggingMailSender implements
    MailSender {

  private static final Logger LOGGER = LoggerFactory.getLogger(LoggingMailSender.class);
  private final RandomGenerator randomGenerator = RandomGenerator.getDefault();

  @Override
  public void sendMail(MailMessage mailMessage) {
    if (randomGenerator.nextBoolean()) {
      LOGGER.info("Sending email: {}", mailMessage);
    } else {
      throw new RuntimeException(
"Email server down");
    }
  }
}

从应用程序发送电子邮件
为了利用 Spring Integration 流程,我们需要创建一个消息网关。这可以通过带有注释的接口完成@MessagingGateway:


import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;

@MessagingGateway
public interface MailGateway {

  @Gateway(requestChannel = "mailInput")
  void sendMail(MailMessage mailMessage);
}

注意,名称requestChannel必须与类中输入通道的 bean 的名称相匹配MailConfiguration。

我们不需要提供实现。Spring Integration 将在运行时为我们实现它。

使用此网关的示例用例可能如下所示:


import com.wimdeblauwe.examples.transactional_outbox_spring_integration.infrastructure.mail.MailGateway;
import com.wimdeblauwe.examples.transactional_outbox_spring_integration.infrastructure.mail.MailMessage;
import com.wimdeblauwe.examples.transactional_outbox_spring_integration.order.Order;
import com.wimdeblauwe.examples.transactional_outbox_spring_integration.order.repository.OrderRepository;
import java.math.BigDecimal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
@Transactional
public class CompleteOrder {

  private static final Logger LOGGER = LoggerFactory.getLogger(CompleteOrder.class);
  private final OrderRepository orderRepository;
  private final MailGateway mailGateway;

  public CompleteOrder(OrderRepository orderRepository, MailGateway mailGateway) {
    this.orderRepository = orderRepository;
    this.mailGateway = mailGateway;
  }

  public void execute(BigDecimal amount, String email) {
    LOGGER.info("Completing order for {}", email);
    Order order = new Order();
    order.setAmount(amount);
    order.setCustomerEmail(email);

    LOGGER.info(
"Save order in database");
    orderRepository.save(order); 

    MailMessage message = new MailMessage(
"Order %s completed".formatted(order.getId()),
       
"Your order is registered in our system and will be processed.",
        order.getCustomerEmail()); 
    LOGGER.info(
"Sending email for order");
    mailGateway.sendMail(message); 
  }
}
  1. 将其保存Order在数据库中。
  2. 编写电子邮件消息的数据。
  3. 将数据传递给MailGateway发送电子邮件。
从用例方面来看,似乎我们同步发送电子邮件,但实际上,与MailMessage存储在同一个事务中,Order并且邮件本身几分钟后异步发送。

测试
为了测试一切是否正常,我们可以创建一个 REST 控制器来触发用例:

import com.wimdeblauwe.examples.transactional_outbox_spring_integration.order.usecase.CompleteOrder;
import java.math.BigDecimal;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/orders")
public class OrderRestController {

  private final CompleteOrder completeOrder;

  public OrderRestController(CompleteOrder completeOrder) {
    this.completeOrder = completeOrder;
  }

  @PostMapping
  public void completeOrder(@RequestBody CompleteOrderRequest request) {
    completeOrder.execute(request.amount(), request.email());
  }

  public record CompleteOrderRequest(BigDecimal amount, String email) {

  }
}

使用 IntelliJ 或任何其他工具的 HTTP 客户端发送请求,我们可以添加一些命令:

POST http://localhost:8080/orders
Content-Type: application/json

{
 
"amount": "100.0",
 
"email": "test@example.com"
}

如果您检查应用程序的日志记录,您有时会看到无法发送电子邮件的堆栈跟踪,但不久之后您就会看到最有可能成功的重试。

我们这里的示例使用 PostgreSQL,但如果您改用 MySQL,则需要进行一些更改。在底层,Spring Integration 使用SKIP LOCK,但 MySQL 不支持这一点。

您可以执行以下操作使其与 MySQL 一起工作:

1、定义TransactionInterceptor隔离READ_COMMITTED级别为SpringIntegrationConfiguration:

SpringIntegrationConfiguration.java
  @Bean
  public TransactionInterceptor springIntegrationTransactionInterceptor() {
    return new TransactionInterceptorBuilder()
        .isolation(Isolation.READ_COMMITTED)
        .build();
  }

2、更新mailFlowbean 来使用该拦截器:

  @Bean
  public IntegrationFlow mailFlow(JdbcChannelMessageStore jdbcChannelMessageStore,
      MailSender mailSender,
      @Qualifier("springIntegrationTransactionInterceptor") TransactionInterceptor transactionInterceptor) { 
    return IntegrationFlow.from(mailInput())
        .channel(mailOutbox(jdbcChannelMessageStore))
        .handle(message -> {
          MailMessage mailMessage = (MailMessage) message.getPayload();
          mailSender.sendMail(mailMessage);
        }, e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(1))
            .transactional(transactionInterceptor))) 
        .get();
  }
  • 将 TransactionInterceptor 声明为参数,以便 Spring 能注入它。 我们需要使用限定符,以确保获得我们在 SpringIntegrationConfiguration 中声明的限定符。
  • 将拦截器interceptor 作为参数用于 Transactional() 方法。


Spring Modulith
Spring Modulith 是 Spring 产品组合中的一个新项目。它由 Oliver Drotbohm 领导,旨在让使用 Spring 构建模块化单体应用程序变得更加容易。

模块之间的通信可以通过使用 Spring 核心异步完成ApplicationEventPublisher。Spring Modulith 具有额外的基础架构,通过首先将其存储在数据库中来确保此类事件永远不会丢失。我们可以利用这一点来构建我们的发件箱模式。

项目设置
在start.spring.io上创建一个 Spring Boot 项目,配置如下:

项目:Maven
语言:Java
Spring Boot:3.3.0
Java:21
依赖项:

  • Spring Web
  • Spring Data JPA
  • Spring Modulith
  • Docker Compose Support
  • PostgreSQL Driver
  • Flyway Migration

替换spring-modulith-starter-jpa为spring-modulith-starter-jdbc:

pom.xml

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-starter-jdbc</artifactId>
</dependency>

在此示例中,我们将从用例中发布一个OrderCompleted事件。事件本身是一个简单的记录,其中包含对订单 ID 的引用:

public record OrderCompleted(Long orderId) {

}

用例发布事件,发布OrderCompleted事件:


@Component
@Transactional
public class CompleteOrder {

  private static final Logger LOGGER = LoggerFactory.getLogger(CompleteOrder.class);
  private final OrderRepository orderRepository;
  private final ApplicationEventPublisher eventPublisher;

  public CompleteOrder(OrderRepository orderRepository, ApplicationEventPublisher eventPublisher) {
    this.orderRepository = orderRepository;
    this.eventPublisher = eventPublisher;
  }

  public void execute(BigDecimal amount, String email) {
    LOGGER.info("Completing order for {}", email);
    Order order = new Order();
    order.setAmount(amount);
    order.setCustomerEmail(email);

    LOGGER.info(
"Save order in database");
    orderRepository.save(order);

    eventPublisher.publishEvent(new OrderCompleted(order.getId())); 
  }
}

现在我们可以创建一个 Spring 组件来监听事件并发送邮件通知:

@Component
public class MailNotifier {

  private static final Logger LOGGER = LoggerFactory.getLogger(MailNotifier.class);
  private final MailSender mailSender;
  private final OrderRepository orderRepository;

  public MailNotifier(MailSender mailSender, OrderRepository orderRepository) {
    this.mailSender = mailSender;
    this.orderRepository = orderRepository;
  }

  @ApplicationModuleListener 
  public void onOrderCompleted(OrderCompleted orderCompleted) {
    Order order = orderRepository.findById(orderCompleted.orderId())
        .orElseThrow(() -> new RuntimeException("Order not found"));

    MailMessage message = new MailMessage(
"Order %s completed".formatted(order.getId()),
       
"Your order is registered in our system and will be processed.",
        order.getCustomerEmail());
    LOGGER.info(
"Sending email for order {}", orderCompleted.orderId());
    mailSender.sendMail(message);
  }
}

将该方法标记为 @ApplicationModuleListener 方法。 这是 Spring Modulith 提供的一个注解,由以下内容组合而成:
  • @Async:因为我们希望以异步方式发送邮件。 我们不希望 CompleteOrder 用例的处理受到电子邮件发送的影响。
  • @Transactional: 由于我们的监听器是在单独的线程中运行的,所以我们应该启动一个新事务来从存储库中获取订单的状态。
  • @TransactionalEventListener: 这样可以确保在包含发送事件的事务完成时调用此方法。 如果事务回滚,我们的监听器就不会被调用。

我们可以再次使用 IntelliJ HTTP 客户端进行测试,并注意到有时邮件发送正确,有时发送失败(因为我们的邮件发送器有随机失败代码)。如果我们检查数据库,我们可以看到事件是否已存储并标记为已发布:

6fcaa30a-2b36-4f10-a091-4ce10ab520ea

MailNotifier.onOrderCompleted(OrderCompleted)

OrderCompleted

{"orderId":1}

2024-06-13 05:50:43.090615 +00:00

2024-06-13 05:50:43.148320 +00:00

这里的优点是事件被序列化为 JSON,因此数据库中可以读取它所包含的内容。使用 Spring Integration,它使用 Java 序列化,因此您只能获得毫无意义的字节。

重试失败事件
与 Spring Integration 不同,没有自动重试,但我们可以轻松添加它。

第一种方法是设置一个属性,在应用程序启动时重试事件:

application.properties
spring.modulith.republish-outstanding-events-on-restart=true

如果您有失败的事件并重新启动 Spring Boot 应用程序,您会注意到事情会重试。但是,我怀疑这是否真的有用,因为通常您不会重新启动应用程序那么多。

更好的方法是时不时地查询未发布的事件并重新发布它们。为了实现这一点,我们可以MailNotifier像这样更新:

@Component
public class MailNotifier {

  private static final Logger LOGGER = LoggerFactory.getLogger(MailNotifier.class);
  private final MailSender mailSender;
  private final OrderRepository orderRepository;
  private final IncompleteEventPublications incompleteEventPublications;

  public MailNotifier(MailSender mailSender, OrderRepository orderRepository, IncompleteEventPublications incompleteEventPublications)  { 
    this.mailSender = mailSender;
    this.orderRepository = orderRepository;
    this.incompleteEventPublications = incompleteEventPublications;
  }

  @Scheduled(fixedRate = 5, timeUnit = TimeUnit.SECONDS) 
  public void retries() {
    this.incompleteEventPublications.resubmitIncompletePublicationsOlderThan(Duration.ofSeconds(5)); 
  }

  // ... other code below
}
  •  注入 Spring Modulith 的 IncompleteEventPublication 接口。
  • 在公共方法中添加 @Scheduled 并设定一定的轮询频率。 在我们的示例中,Spring 将每 5 秒钟调用一次该方法。
  • 重新发布任何超过 5 秒钟的未完成事件。


通过此设置,应用程序运行时将重试失败的事件。

消息排序
Spring Integration 解决方案与 Spring Modulith 解决方案的一个重要区别是,使用 Spring Integration 时,顺序会保留,一条消息失败将阻止处理下一条消息。使用 Spring Modulith 时,由于应用程序模块侦听器是异步调用的,因此将同时执行对各个事件发布的重试。因此,无法保证它们最终出现在电子邮件服务器中的顺序。

在我们发送电子邮件的示例中,上一条消息失败时无需停止发送下一条消息。但在其他场景中(例如将消息放在 Kafka 上),您可能确实关心消息顺序。

运行多个实例
另一个重要的区别是当您运行应用程序的多个实例时。

使用 Spring Integration,电子邮件将从其中一个实例发送。因此不会出现重复电子邮件,并且如果执行重试的那个实例失败,另一个实例将自动接管。

使用 Spring Modulith,如果一切顺利,我们也不会发送重复的电子邮件。但是@Scheduled注释是由两个实例完成的,如果有两个实例在运行,则会导致发送重复的电子邮件。我们可以通过使用ShedLock来解决这个问题,例如,只有一个实例执行事件重试。

结论
Spring Integration 和 Spring Modulith 都可用于构建事务发件箱,以更确定您的主数据库操作和对外部系统的任何通知是否同步且不会丢失。然而,Spring Integration 解决方案似乎确实比 Spring Modulith 解决方案有一些优势。

请参阅GitHub 上的transactional-outbox-spring-integration 和 transactional-outbox-spring-modulith 以获取这些示例的完整源代码。