在本文中,我们将讨论在@Transactional块中发布消息的需求以及相关的性能挑战,例如延长的数据库连接时间。为了解决这个问题,我们将利用Spring Modulith的功能来监听 Spring 应用程序事件并自动将它们发布到Kafka主题。
事件外部化即从模块内部向外发布事件到其他地方。
事务操作和消息代理
对于本文的代码示例,我们假设我们正在编写负责在 Baeldung 上保存文章的功能:
@Service class Baeldung { private final ArticleRepository articleRepository; // constructor @Transactional public void createArticle(Article article) { validateArticle(article); article = addArticleTags(article); // ... other business logic articleRepository.save(article); } }
|
此外,我们需要将这条新Article通知系统的其他部分。有了这些信息,其他模块或服务将做出相应反应,创建报告或向网站读者发送新闻通讯。实现此目的的最简单方法是注入知道如何发布此事件的依赖项。对于我们的示例,让我们使用KafkaOperations向“ baeldung.articles.published ”主题发送消息,并使用Article的slug()作为键:
@Service class Baeldung { private final ArticleRepository articleRepository; private final KafkaOperations<String, ArticlePublishedEvent> messageProducer; // constructor @Transactional public void createArticle(Article article) { // ... business logic validateArticle(article); article = addArticleTags(article); article = articleRepository.save(article); messageProducer.send( "baeldung.articles.published", article.slug(), new ArticlePublishedEvent(article.slug(), article.title()) ).join(); } }
|
然而,由于一些不同的原因,这种方法并不理想。从设计的角度来看,我们将领域服务与消息生产者耦合起来。此外,领域服务直接依赖于较低级别的组件,这违反了基本的清洁架构规则之一。此外,这种方法还会产生性能影响,因为一切都发生在@Transacional方法中。因此,为保存文章而获取的数据库连接将保持打开状态,直到消息成功发布。
最后,保存实体和发布消息将作为原子操作完成。换句话说,如果生产者未能发布事件,数据库事务将被回滚。
使用 Spring 事件进行依赖反转
我们可以利用Spring Events来改进我们解决方案的设计。我们的目标是避免直接从我们的域服务将消息发布到Kafka。让我们删除KafkaOperations依赖项并发布内部应用程序事件:
@Service public class Baeldung { private final ApplicationEventPublisher applicationEvents; private final ArticleRepository articleRepository; // constructor @Transactional public void createArticle(Article article) { // ... business logic validateArticle(article); article = addArticleTags(article); article = articleRepository.save(article); applicationEvents.publishEvent( new ArticlePublishedEvent(article.slug(), article.title())); } }
|
除此之外,我们还将有一个专门的 Kafka 生产者作为我们基础设施层的一部分。该组件将侦听 ArticlePublishedEvent并将发布委托给底层KafkaOperations bean:@Component class ArticlePublishedKafkaProducer { private final KafkaOperations<String, ArticlePublishedEvent> messageProducer; // constructor @EventListener public void publish(ArticlePublishedEvent article) { Assert.notNull(article.slug(), "Article Slug must not be null!"); messageProducer.send("baeldung.articles.published", article.splug(), event); } }
|
通过这种抽象,基础设施组件现在依赖于领域服务生成的事件。换句话说,我们已经成功地减少了耦合并反转了源代码依赖。此外,如果其他模块对文章创建感兴趣,它们现在可以无缝监听这些应用程序事件并做出相应反应。原子与原子 非原子操作
现在,让我们深入研究性能考虑因素。首先,我们必须确定当与消息代理的通信失败时回滚是否是所需的行为。这种选择根据具体情况而有所不同。
如果我们不需要这种原子性,则必须释放数据库连接并异步发布事件。为了模拟这一点,我们可以尝试创建一篇没有slug 的文章, 导致ArticlePublishedKafkaProducer::publish失败:
@Test void whenPublishingMessageFails_thenArticleIsStillSavedToDB() { var article = new Article(null, "Introduction to Spring Boot", "John Doe", "<p> Spring Boot is [...] </p>"); baeldung.createArticle(article); assertThat(repository.findAll()) .hasSize(1).first() .extracting(Article::title, Article::author) .containsExactly("Introduction to Spring Boot", "John Doe"); }
|
如果我们现在运行测试,它将失败。发生这种情况是因为ArticlePublishedKafkaProducer抛出异常,导致域服务回滚事务。但是,我们可以通过将@EventListener注解替换为@TransactionalEventListener和@Async来使事件监听器异步:@Async @TransactionalEventListener public void publish(ArticlePublishedEvent event) { Assert.notNull(event.slug(), "Article Slug must not be null!"); messageProducer.send("baeldung.articles.published", event); }
|
如果我们现在重新运行测试,我们会注意到异常已记录,事件未发布,并且实体已保存到数据库中。而且,数据库连接释放得更快,允许其他线程使用它。使用 Spring Modulith 进行事件外部化
我们通过两步方法成功解决了原始代码示例的设计和性能问题:
使用 Spring 应用程序事件进行依赖反转
利用@TransactionalEventListener和@Async进行异步发布
Spring Modulith 允许我们进一步简化代码,为该模式提供内置支持。 让我们首先将spring-modulith-events-api的 Maven 依赖项添加到pom.xml中:
<dependency> <groupId>org.springframework.modulith</groupId> <artifactId>spring-modulith-events-api</artifactId> <version>1.1.2</version> </dependency>
|
该模块可以配置为侦听应用程序事件并自动将它们外部化到各种消息系统。我们将坚持原来的示例并重点关注 Kafka。对于此集成,我们需要添加spring-modulith-events-kafka依赖项:<dependency> <groupId>org.springframework.modulith</groupId> <artifactId>spring-modulith-events-kafka</artifactId> <version>1.1.2</version> </dependency>
|
现在,我们需要更新ArticlePublishedEvent 并使用@Externalized对其进行注释。此注释需要路由目标的名称和密钥。换句话说,Kafka 主题和消息键。对于键,我们将使用SpEL表达式来调用Article :: slug():@Externalized("baeldung.article.published::#{slug()}") public record ArticlePublishedEvent(String slug, String title) { }
|
事件外部化配置
尽管@Externalized注释的值对于简洁的 SpEL 表达式很有用,但在某些情况下我们可能希望避免使用它:
如果表达式变得过于复杂
当我们的目标是将有关主题的信息与应用程序事件分开时
如果我们想要应用程序事件和外部化事件的不同模型
对于这些用例,我们可以使用EventExternalizationConfiguration 的构建器配置必要的路由和事件映射。之后,我们只需将此配置公开为Spring bean:
@Bean EventExternalizationConfiguration eventExternalizationConfiguration() { return EventExternalizationConfiguration.externalizing() .select(EventExternalizationConfiguration.annotatedAsExternalized()) .route( ArticlePublishedEvent.class, it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug()) ) .mapping( ArticlePublishedEvent.class, it -> new ArticlePublishedKafkaEvent(it.slug(), it.title()) ) .build(); }
|
在这种情况下,我们将从ArticlePublishedEvent中删除路由信息,并保留@Externalized注释,使其不具有任何值:@Externalized public record ArticlePublishedEvent(String slug, String title) { }
|
七、结论
在本文中,我们讨论了需要我们从事务块内发布消息的场景。我们发现这种模式可能会对性能产生很大的影响,因为它可能会长时间阻塞数据库连接。
之后,我们使用 Spring Modulith 的功能来监听 Spring 应用程序事件并自动将其发布到 Kafka 主题。这种方法使我们能够异步外部化事件并更快地释放数据库连接。