使用Eventuate实现微服务CDC事务消息传递

在本文中,我们探讨了事务性消息传递的复杂性,从原子地执行数据库操作和发布域事件的挑战开始。我们发现了隐藏的困难,并了解了事务发件箱模式如何帮助解决这些困难。

然后,我们使用EventuateTram框架,它为我们实现了这个模式。通过使用Eventuate CDC Service(它利用变更数据捕获来监控发件箱表并向Kafka发送消息),我们实现了最终的一致性,并保证了系统中的至少一次交付。

在本教程中,我们将探讨维护数据库操作和消息传递之间的数据一致性的挑战。我们将开始检查问题,然后实现事务发件箱模式来解决关键问题。

接下来,我们将引入EventuateTram,用准备发布到特定主题的消息填充发件箱表。最后,我们将在自己的Docker容器中运行Eventuate CDC Service,以监视发件箱表中的更改,并通过Kafka发布相应的消息。

什么时候需要transmitting?
就像我们经常依赖数据库事务来确保数据操作的原子性一样,我们也可能需要以原子方式将消息发布到消息代理。例如,有时我们需要将数据保存到数据库,并将消息发布到消息代理作为单个原子操作。

虽然这看起来很简单,但它带来了一些隐藏的挑战。让我们使用一个简单的用例来探讨这个问题,我们将尝试将一个Comment实体保存到数据库,并将一个事件发布到sample.comment.added Kafka主题。

一种简单的方法是在事务块中发布消息。
例如,如果我们使用Spring Data JPA进行数据库操作,使用KafkaTemplate发送消息,我们的域服务可能看起来像这样:

@Service
class CommentService {
    private final CommentRepository comments;
    private final KafkaTemplate<Long, CommentAddedEvent> kafkaTemplate;
 
    // constructor
   
    @Transactional
    public Long save(Comment comment) {
        Comment saved = this.comments.save(comment);
        log.info("Comment created: {}", saved);
        CommentAddedEvent commentAdded = new CommentAddedEvent(saved.getId(), saved.getArticleSlug());
        kafkaTemplate.send("sample.comment.added", saved.getId(), commentAdded);
    }
}

但是,这种方法会在数据库提交发生之前发布Kafka消息。换句话说,即使事务在提交时失败,我们也有发送消息的风险,并且操作被回滚。

另一方面,如果我们尝试删除@ transmitting annotation,当发布到Kafka失败时,Spring不会回滚DB插入。

不用说,这两种方法都不是理想的,因为这两种方法都可能导致系统之间的数据不一致。

transmitting发件箱模式
我们可以实现transmitting发件箱模式,以确保系统中的最终一致性。此模式涉及将消息保存在特殊的数据库表中(即,“发件箱”)在同一个事务中作为我们的数据更改。

之后,一个单独的进程读取发件箱并将消息发布到消息代理。然后,它会更新、删除或将记录标记为已发布,以跟踪已发送的内容:

Transactional Outbox Pattern

发布事件和更新发件箱表时也可能发生类似的问题。

  • 我们希望避免在事件成功发布之前更新发件箱,以防止丢失事件。
  • 另一方面,如果发送了事件但数据库更新失败,系统可能会重试并再次发送事件。这可能会导致重复的事件,但总比丢失它们好。

总的来说,这种方法确保了“至少一次”交付,将可靠性置于避免重复之上。

演示应用程序概述
在本文中,我们将使用一个简单的Spring靴子应用程序,该应用程序为sample这样的博客站点管理文章评论。用户可以通过向/API/articles/{slug}/comments端点发送POST请求来为文章添加评论:

curl --location "http://localhost:8080/api/articles/oop-best-practices/comments" \
--header "Content-Type: application/json" \
--data "{
    \"articleAuthor\": \"Andrey the Author\",
    \"text\": \"Great article!\",
    \"commentAuthor\": \"Richard the Reader\"
}"

为了快速测试,我们可以使用位于src/rest/resources中的post-comment.bat脚本运行这个curl命令。

当一个Comment实体被保存到数据库时,系统也会发布一条Kafka消息。此消息包括新保存的评论ID和文章slug,并发送到名为sample.comment.added的主题。

为了设置本地环境,我们将使用Docker为PostgreSQL、Kafka和Eventuate的CDC服务启动容器。这可以很容易地使用位于src/test/resources中的eventuate-docker-compose.yml文件完成。我们还将使用eventuate配置文件在本地启动Spring靴子应用程序:

Transactional Outbox Pattern - Components Involved

要查看实践中的所有内容,我们还可以参考我们的集成测试EventuateTramLiveTest。

Eventuate
Eventuate是一个Java平台,支持核心微服务模式,如CQRS、事件源和事务Sagas。其组件之一EventuateTram通过事务发件箱模式和事件发布实现了可靠的服务间通信。

让我们集成Eventuate Tram,以确保在我们的示例中至少一次交付Kafka消息。首先,我们将eventuate-tram-spring-jdbc-Kafka和eventuate-tram-spring-events的必要依赖项添加到pom.xml中:

<dependency>
    <groupId>io.eventuate.tram.core</groupId>
    <artifactId>eventuate-tram-spring-jdbc-kafka</artifactId>
    <version>0.36.0-RELEASE</version>
</dependency>
<dependency>
    <groupId>io.eventuate.tram.core</groupId>
    <artifactId>eventuate-tram-spring-events</artifactId>
    <version>0.36.0-RELEASE</version>
</dependency>

然后,我们将导入两个配置类:

@Configuration
@Import({
    TramEventsPublisherConfiguration.class,
    TramMessageProducerJdbcConfiguration.class
})
class EventuateConfig {
}

此外,我们需要更改CommentAddedEvent记录,并确保它实现Eventuate的DomainEvent接口:

record CommentAddedEvent(Long id, String articleSlug) implements DomainEvent {
}

最后,我们将重构域服务,它包含所有逻辑。这一次,我们将使用DomainEventPublisher bean来发布CommentAddedEvent,而不是直接发布到Kafka:

@Service
class CommentService {
    private final CommentRepository comments;
    private final DomainEventPublisher domainEvents;
    // constructor
    @Transactional
    public Long save(Comment comment) {
        Comment saved = this.comments.save(comment);
        log.info("Comment created: {}", saved);
        CommentAddedEvent commentAdded = new CommentAddedEvent(saved.getId(), saved.getArticleSlug());
        domainEvents.publish(
            "sample.comment.added",
            saved.getId(),
            singletonList(commentAdded)
        );
        return saved.getId();
    }
}

因此,每当我们坚持一个评论实体,我们还将插入一个评论AddedEvent进入结果消息同一事务中的表。

让我们通过连接到数据库并查询comment表来验证这一点:

mydb=<strong>select * from comment;</strong>
 id |   article_slug    |   comment_author   |      text       
----+-------------------+--------------------+------------------
  1 | oop-best-practices | Richard the Reader | Great article!
(1 row)

让我们也从eventuate模式查询消息表。假设CDC服务关闭,我们可以预期只检索一条标记为未发布的消息:

mydb=<strong>select id, destination, published from eventuate.message;</strong>
                  id                  |        destination         | published 
--------------------------------------+----------------------------+-----------
 0000019713d8ffe4-e86a640584cf0000    | sample.comment.added     |     0
(1 row)

Eventuate的CDC服务
变更数据捕获(CDC)是一种用于检测和跟踪数据库中的更改(如插入、更新和删除)的技术,以便捕获这些更改并将其发送到其他系统。因此,EventuateCDC服务是捕获发件箱表的更改并将其作为事件发布到消息代理的组件。

Eventuate CDC服务目前支持多种消息代理,包括Apache Kafka、ActiveMQ、RabbitMQ和Redis。对于数据库,它使用高效的事务日志,通过binlog协议跟踪MySQL,并使用WAL跟踪Postgres。或者,对于其他与JDBC兼容的数据库,它福尔斯采用效率较低的轮询方法来检测更改。

如果我们启动CDC服务并重新运行测试,我们会注意到eventuate.messages表中的条目将被标记为已发布:

mydb=<strong>select id, destination, published from eventuate.message;</strong>
                  id                  |        destination         | published 
--------------------------------------+----------------------------+-----------
 0000019713d8ffe4-e86a640584cf0000    | sample.comment.added     |     1
(1 row)

最后,我们可以使用kafka-console-consumer.sh来验证消息是否成功发布到我们的主题:

{
  "payload": "{ \"id\": 1, \"articleSlug\": \"oop-best-practices\" }",
  "headers": {
    "PARTITION_ID": "1",
    "event-aggregate-type": "sample.comment.added",
    "DATE": "Tue, 27 May 2025 22:24:37 GMT",
    "event-aggregate-id": "1",
    "event-type": "com.sample.eventuate.tram.domain.CommentAddedEvent",
    "DESTINATION": "sample.comment.added",
    "ID": "0000019713d8ffe4-e86a640584cf0000"
  }
}

正如预期的那样,消息被传递,发件箱表也相应地更新。