使用 Apache Kafka 和 Spring Modulith 实现发件箱模式


在这篇博客中,我们将探讨事件驱动系统中常见的“双写”问题,以及如何使用 Spring Modulith 来简单地实现 Outbox 模式来解决该问题。

在构建任何涉及多个组件的非复杂系统时,迟早会遇到需要以一致的方式更新两个系统的解决方案设计。通常情况下,一个系统是数据库,而另一个系统可以是微服务架构中的另一个服务,或者是事件驱动架构中的事件存储。这种情况有时被称为双写问题。

当涉及的系统是数据库和事件存储时,一种流行的解决方案是应用 Outbox 模式。在了解该模式之前,我们先通过一个示例来理解为什么需要它。

假设收到创建订单的命令,订单的状态需要持久化到数据库中,域事件 ORDER_CREATED 需要发布到 Apache Kafka 主题中。让我们来看看可能的选项。

双写方式一:将事件发布到Kafka并将数据保存到DB
这是伪代码的方法:

sendEventToKafka();
startTransaction();
saveOrderInDB();
commitTransaction();

该事件被发布到 Kafka,然后将订单保存在数据库中。但是如果事件发布后,到数据库的本地事务失败怎么办?已发布的事件无法回滚,导致系统不一致。

双写方法2:将数据保存在DB中并将事件发布到Kafka
如果执行顺序颠倒会发生什么?

startTransaction();
saveOrderInDB();
sendEventToKafka();
commitTransaction();

乍一看,这似乎解决了一致性问题。但它还面临着其他并不明显的问题。订单的创建现在取决于 Kafka 的可用性。虽然订单服务本身可用,但如果 Kafka 宕机,则无法满足请求。 

高负载时会出现一个更微妙的问题。如果 Kafka 不可用,事务将回滚,并且数据库不会更新,但事务(和数据库连接)将保持打开状态,直到事件发布到 Kafka。如果负载足够,数据库连接池可能会耗尽。一般来说,DB事务不应该执行IO或网络相关的操作。

如果事件异步发布到Kafka怎么办?

[thread 1] startTransaction();
[thread 1] saveOrderInDB();
[thread 1] commitTransaction();
[thread 2] sendEventToKafka();

这种方法避免了请求对 Kafka 的依赖,但仍然存在不一致的问题。由于事件是在数据库事务提交后异步发布到Kafka的,因此如果事件发布失败,则不存在回滚的可能性。

看来我们必须处理系统不一致或系统不可用的风险。

什么是发件箱模式?
在发件箱模式中,不是将事件发布到 Kafka,而是将记录添加到特殊的数据库表(称为“发件箱”)中以捕获意图发布事件。由于这是发生原始持久化的同一个数据库,因此可以重用同一个事务,并且不再是双写问题,并且可以在同一个事务中轻松执行。这是伪代码:

startTransaction();
saveOrderInDB();
saveIntentInOutboxTable();
commitTransaction();

然后,一个单独的异步进程监视发件箱表并发布事件。这解决了上述双写方法中提到的问题。 

系统保证最终一致,因为后台进程将发布事件。 Order的创建不再依赖于Kafka的可用性。最后,数据库事务不涉及任何昂贵的IO或网络操作,防止对高负载产生不利影响。

当然,也有一个缺点。后台进程保证至少发布一次事件。可能会出现重复!这迫使所有希望使用该事件的服务都是幂等的。在事件驱动架构中,建议始终使用幂等消费服务来正确处理错误场景。

发件箱模式自然会增加系统的复杂性。就开发和测试而言,手动实现该模式将是一项艰巨的任务。实现发件箱模式的一种流行方法是使用 Debezium 和变更数据捕获。但这除了 Kafka 本身之外还需要 Kafka Connect 集群,这是一项重大的基础设施投资。

在本博客中,我们将了解一种使用 Spring Modulith 库实现 Outbox 模式的新方法。它不需要复杂的基础设施设置,是 Spring 项目的理想解决方案。

 Spring Modulith发件箱模式
Spring Modulith 是庞大的 Spring 库生态系统的最新成员。它提供了构建易于维护和发展的模块化整体应用程序的工具。

让我们通过一个示例来了解发件箱模式的实现。我们有两项服务:订单和通知。创建订单后,订单详细信息将保留在数据库中。事件 ORDER_CREATED 也会发布到 Kafka。通知服务使用此事件并触发通知。

订单服务是一个模块化整体应用程序,由两个模块组成 - 订单Order 和运输Shipping :
当 ORDER_CREATED 事件发布时,Shipping 模块也会使用该事件并启动运输流程。但由于 Order 和 Shipping 是同一应用程序中的模块,因此它不会使用来自 Kafka 的事件,而是使用 ApplicationEventPublisher – Spring 的内部事件发布功能。

我们将使用 Spring Boot 构建订单服务。首先让我们看一下 Order 整体的打包结构。

src/main/java
└── example
    ├── order
    │   ├── internal
    │   │   ├── Order
    │   │   ├── OrderController
    │   │   ├── OrderManagement
    │   │   └── OrderRepository
    │   └── OrderCreated
    └── shipping
        └── Shipping


添加 Spring Modulith 库需要以下依赖项。完整的 pom.xml 可以在这里找到:https://gitlab.com/axual/public/outbox-pattern-with-spring-modulith/order/pom.xml。

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.modulith</groupId>
      <artifactId>spring-modulith-bom</artifactId>
      <version>1.2.0-SNAPSHOT</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-starter-core</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-starter-jpa</artifactId>
</dependency>

spring-module-starter-jpa 依赖项启用事件发布注册表。该注册表由底层持久性技术(在本例中为 H2)提供支持。它为 H2 创建一个具有以下架构的表 EVENT_PUBLICATION:

CREATE TABLE IF NOT EXISTS EVENT_PUBLICATION
(
  ID               UUID NOT NULL,
  COMPLETION_DATE  TIMESTAMP(9) WITH TIME ZONE,
  EVENT_TYPE       VARCHAR(512) NOT NULL,
  LISTENER_ID      VARCHAR(512) NOT NULL,
  PUBLICATION_DATE TIMESTAMP(9) WITH TIME ZONE NOT NULL,
  SERIALIZED_EVENT VARCHAR(4000) NOT NULL,
  PRIMARY KEY (ID)
)

可以在此处找到其他支持的数据库的架构:schemas>https://docs.spring.io/spring-modulith/reference/appendix.htmlschemas
当 Spring 发布事件ApplicationEventPublisher时,注册表会找到所有预期接收该事件的事务事件侦听器,并在上表中写入一个条目。默认情况下,该条目被视为不完整(COMPLETION_DATE 列为 NULL)。当事务事件侦听器成功完成时,该条目将标记为已完成。

订单和运输这两个模块表示为顶级包。为了将 Order 保存在数据库中,我们需要 Order 实体。

@Entity
@Getter
@NoArgsConstructor
@Table(name = "orders")
public class Order {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String product;

    @Enumerated(EnumType.STRING)
    private OrderStatus status;

    public Order(String product) {
        this.product = product;
        this.status = OrderStatus.CREATED;
    }

    public enum OrderStatus {
        CREATED, COMPLETED
    }
}

创建Order时生成的事件用 Java 记录表示。

public record OrderCreated(Long id, String product) {
}

OrderManagement 类负责创建 Order 并发布 OrderCreated 事件。

@Slf4j
@Service
@Transactional
@RequiredArgsConstructor
public class OrderManagement {

    private final OrderRepository orders;
    private final ApplicationEventPublisher events;

    public Order create(String product) {

        var order = orders.save(new Order(product));

        events.publishEvent(new OrderCreated(order.getId(), order.getProduct()));

        log.info("Order created");

        return order;
    }
}

我们使用Spring内置的ApplicationEventPublisher来发布OrderCreated事件。请注意,这不会将事件发布到 Kafka。该事件保留在 JVM 中,并移交给同一 JVM 中的任何 EventListener。

该事件由运输模块消耗。这是 Shipping 事件监听器。

@Slf4j
@Service
@Transactional
@RequiredArgsConstructor
public class Shipping {

    @ApplicationModuleListener
    void on(OrderCreated event) {
        ship(event.id());
    }

    private void ship(Long orderId) {
        log.info("Started shipping for order {}", orderId);
    }
}

注意注释@ApplicationModuleListener。它来自 Spring Modulith 库,是 @Async 和 @TransactionalEventListener 的快捷方式。根据 Javadoc,“该设置确保原始业务事务成功完成,并且集成在事务本身中异步运行,以尽可能地将集成与原始工作单元解耦”。

这种事件驱动的方法确保订单和运输这两个模块是松散耦合的。如果运输模块不可用(技术上不太可能,因为它们作为整体运行),则订单模块将完全发挥作用。

将事件发布到 Kafka
此时,我们的整体模块中的模块运行良好并通过事件进行通信。但我们还需要将事件发布到 Kafka,以确保通知服务能够使用它并触发通知。

使用 Spring Modulith,实现起来很简单。我们需要将事件外部化到我们选择的事件代理(在本例中为 Kafka)。让我们在 pom.xml 中添加所需的依赖项。

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-events-kafka</artifactId>
</dependency>

此依赖项会自动拉入 Spring Kafka 库。它将查找带有 @Externalized 注释的事件,并自动将事件发布到 Kafka 主题。

@Externalized("order-created::#{id()}")
public record OrderCreated(Long id, String product) {
}

该注解接受一个主题名称和一个信息关键标识符。这里的主题名称是订单创建,OrderCreated 记录中的 id() 函数是获取key的策略。

实施通知服务
通知服务将侦听 Kafka 主题中的事件order-created。这可以通过标准 Spring Kafka 消费者来实现。

@Slf4j
@SpringBootApplication
public class NotificationApplication {

    public static void main(String[] args) {
        SpringApplication.run(NotificationApplication.class);
    }

    @KafkaListener(topics = "order-created", groupId = "notification")
    public void notify(OrderCreated event) {
        log.info(
"Notifying user for created order {} and product {}", event.id(), event.product());
    }
}

在“notification”文件夹下运行相同的命令来启动通知服务。它应该立即使用来自 Kafka 的消息并记录一行:“通知用户创建订单 1 和产品“咖啡”。

本博客中引用的所有代码均可在此处获取:https://gitlab.com/axual/public/outbox-pattern-with-spring-modulith。该存储库包含一个用于快速启动服务的 docker-compose 文件、用于本地分布式跟踪的 Kafka 集群和 Zipkin。