使用Spring Boot的事件和事务注意点


如果您要使用spring transaction和event publishers编写代码,则需要记住一些规则:

  • 事务绑定到一个线程
  • 默认情况下,当您跳出标记为@Transactional的方法时,将提交事务
  • 默认情况下,事务内部调用的所有方法都使用它
  • 默认情况下,事件侦听器在调用程序线程中执行,这意味着它们使用与调用者相同的事务
  • 为了确保在提交调用者的事务之后调用您的方法,您需要使用@TransactionalEventListener
  • 如果您使用@TransactionalEventListener 然后 记住您需要启动一个新事务来提交监听器里面发生的任何写操作。

现在后端应用程序中存在一种趋势,即将服务调用流更改为基于事件的调用。我认为这是由于领域驱动设计DDD概念的影响越来越大。挑选这种方法有时是合理的,有时它只是“炒作”。我想集中讨论在数据库事务中可能遇到的基于spring的应用程序中引入事件的后果。

我们来看一下你可以在这里找到的一个简单的spring boot应用程序。它包含一个注册用户的其余API端点。注册包含两个操作:


此操作发生在UserService中的一个事务方法中,该方法从UserResource(spring controller)调用。到目前为止,没什么特别的。

@Transactional
@Service
public class UserService {

    public User createUser(User user) {
        User savedUser = userRepository.save(user);
        eventPublisher.publishEvent(new UserCreated(savedUser.id));
        return savedUser;
    }

}

让我们扩展我们的应用程序来处理UserCreated事件。
我写了一个监听器(UserQueueNotifier),其职责非常简单:

  • 订阅活动
  • 根据事件中的数据(userId)从数据库加载用户
  • 将电子邮件和生成的注册令牌放入jms队列,以模拟与负责发送注册确认电子邮件的外部系统的通信

注意点:有些人一眼就认识到我的事件中只有部分数据(userId)违反DDD规则。在纯DDD世界中,我们应该将用户包含在UserCreated事件中。我故意这样做有两个原因 - 一个是它简化了示例;第二个,也许更重要的是,我在一个不遵循所有DDD规则的实际项目中遇到了这样的解决方案。

@Component
public class UserQueueNotifier {

    @EventListener(UserCreated.class)
    public void onUserCreate(UserCreated userCreated) {
        User user = userRepository.find(userCreated.id);
        user.generateToken();
        userRepository.save(user);
        jmsTemplate.convertAndSend("emails", new EmailWithToken(user.email, user.token()).asJmsMessage());
    }
}

要订阅spring事件,您只需要编写一个void方法,该方法将事件作为参数并将其标记为@EventListener。为了证明一切都在一起工作,你可以从类UserResourceHbTest运行测试 (测试应该是不言自明的)。其中一个检查是否在注册后,队列中存在正确的消息。

@Test
public void registerPutsEmailAndTokenOnQueue() throws Exception {
    JSONObject createUserJson = new JSONObject()
            .put("login", "dev-user2")
            .put(
"email", "dev-user2@pragmatists.pl");

    api.post(
"users", createUserJson.toString());

    String message = (String) jmsTemplate.receiveAndConvert(
"emails");
    List<String> emailAndToken = newArrayList(Splitter.on(
";").split(message));
    assertThat(emailAndToken.get(0)).isEqualTo(
"dev-user2@pragmatists.pl");
    assertThat(emailAndToken.get(1)).isNotEmpty();
}


Listener(UserQueueNotifier)附带了UserRepository 接口的特殊实现,它在下面使用了spring数据框架。我调用了这个实现HbUserRepository

public interface UserRepository {

    User find(UserId userId);
    User save(User user);
}

public class HbUserRepository implements UserRepository {

    private final JpaUserRepository jpaUserRepository;

    public HbUserRepository(JpaUserRepository jpaUserRepository) {
        this.jpaUserRepository = jpaUserRepository;
    }

    @Override
    public User find(UserId userId) {
        return jpaUserRepository.findById(userId).orElse(null);
    }

    @Override
    public User save(User user) {
        return jpaUserRepository.save(user);
    }
}

让我们假设由于某种原因,您决定切换UserRepository接口的实现并使用本机sql。我称之为JdbcTemplateUserRepository  - 它使用JdbcTemplate进行查询。

public class JdbcTemplateUserRepository implements UserRepository {
    
    @Override
    public User find(UserId userId) {
        return jdbcTemplate.queryForObject("SELECT * FROM USER WHERE ID = ?", new Object[]{userId.asString()}, new UserRowMapper());
    }
    
}

在这个“基础设施”改变之后,我重新运行了测试(UserResourceJdbcTemplateTest):

测试结果是红色的!测试无法通过!

我希望你也会感到惊讶。一切都归功于Spring的事务传播。

事务边界是一个线索

我们需要从线索的顶部开始解释并查看事务开始的位置。UserService被标记为@Transactional,因此方法createUser中的所有内容(如果在同一个线程中运行)都在同一个事务中。通过存储库保存用户后,我们发布了一个事件。默认情况下,发布事件是同步的,因此您可以想象将标记为@EventListener的所有方法体内联到createUser方法中。在publishEvent方法下面,spring只是循环遍历所有侦听器的方法并简单地调用它们。没有魔法,没有隐藏的异步调用。

如果事件侦听器的代码与它的调用者在同一个线程中运行,则默认情况下它会传播相同的事务。这意味着当我们在UserQueueNotifier侦听器中调用UserRepository上的find方法时,我们仍处于未提交的事务中。在跳出标记为@Transactional的方法后,提交将在离开监听器之后才会发生。那么,我们如何从我们的数据库中读取用户呢?

这是棘手的部分 - 如果UserRepository使用spring数据提供的底层实现,那么可以找到用户,因为它存储在hibernate会话中。我们根本不触及数据库,我们只是从会话中检索用户。

如果我们通过JdbcTemplate将实现更改为直接对数据库本机sql进行查询,那么当然不会在数据库中找到用户,因为事务尚未提交!这就是我们进行红色测试的原因。

进入事件侦听器时提交事务
我们能用Spring机制处理这种常见情况吗?答案当然是肯定的。
需要在方法标记上增加@TransactionalEventListener注释。
默认情况下,它已将属性阶段设置为TransactionPhase。AFTER_COMMIT表示将在提交调用事务时执行侦听器代码。

@Component
public class UserQueueNotifier {

    @EventListener(UserCreated.class)
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onUserCreate(UserCreated userCreated) {
        User user = userRepository.find(userCreated.id);
        user.generateToken();
        userRepository.save(user);
        jmsTemplate.convertAndSend("emails", new EmailWithToken(user.email, user.token()).asJmsMessage());
    }
}

提交后是否传播了事务?
问题是:当我们的侦听器代码在提交后执行或我们是否需要打开一个新代码时,我们仍处于事务中吗?如果我们在侦听器中编写一些写操作会发生什么?

关于Spring文档事务同步,我们需要阅读的文档。文档说“在事务提交后调用。可以在主要事务成功提交后立即执行进一步的操作。“但最重要的部分来自于此。“事务将已经提交,但事务资源可能仍然是活动的和可访问的。因此,此时触发的任何数据访问代码仍将“参与”原始事务,允许它执行一些清理(比如不再执行任何提交!),除非它明确声明它需要在单独的事务中运行”。

所以我们的情况是在一个交易中,但提交将不会完成!?这很奇怪,但文档附带了一个解决方案。我们需要声明这部分代码将在新事务中运行。它可以通过两种方式实现:

  • 我们可以使用@Transactional(propagation = Propagation.REQUIRES_NEW)启动新事务
  • 我们可以使用标准的@Transactional,但由于事务是在同一个线程中传播的,我们需要运行一个新的。我们可以在方法上使用@Async注释

在我的简单示例中,我决定使用相同的线程并在那里开始新的事务。选择第二种方法需要重写测试,必须能够等待这个新线程完成。

要在侦听器中提交事务,请在UserResourceTest中的registeredUserHasTokenGenerated方法中删除@Ignore。除非在侦听器方法上添加@Transactional(propagation = Propagation.REQUIRES_NEW),否则测试应该是失败的,因为未提交写入数据库(userRepository.save)。