这篇博客试图解释我们如何利用Spring的TransactionSynchronization来实现在事务提交后执行业务代码,以及如何使用 Spring AOP巧妙优雅实现的。
在spring中使用事务时,往往需要一些特定的方法在事务提交成功后才能运行,我们通过一个例子来理解这一点:-
假设一个系统中有两个微服务,第一个管理用户生命周期,我们称之为用户管理服务(UMS),另一个为用户准备订阅源(Feed Service)。
当新用户签约在UMS做一些验证,然后将一条消息推向create_feed的Apache Kafka的主题Topic。另一方面,Feed服务侦听有关create_feed主题的消息,然后调用 UMS 以获取用户的兴趣以准备Feed。
你看到这个流程有很大的问题吗!?如果注册过程中的任何后续方法失败 - 用户详细信息从未保存在系统中并且 Feed 服务的get user interest调用将会失败,该怎么办?当谈论多个微服务交互时,这个问题会被放大。
可能还有许多其他要求需要某些特定方法仅执行事务后提交,让我们看看如何优雅地解决这个问题!
解决方案
可以在TransactionSynchronizationAdapter的事务提交afterCommit方法后执行一段代码:
TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronizationAdapter() { @Override public void afterCommit() { // code for publishing message to kafka } });
|
虽然这是可行的,但它需要在任何使用它的地方添加大量样板代码,这根本不是解决问题的一种非常干净的方法。
让我们看看如何创建一个注解 ( @PostCommit) 并使用 Spring AOP 围绕通知从后台驱动这一切。
#1 构建注释
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface PostCommit { }
|
这部分很简单,类似于在 Java 中创建任何其他注释
#2 构建 PostCommitAdapter
PostCommitAdapter 将提供两个功能
- 它将注册runnables到一个ThreadLocal
- 覆盖TransactionSynchronizationAdapter的AfterCommit,在事务提交时运行所有注册runnables
@Slf4j @Component public class PostCommitAdapter extends TransactionSynchronizationAdapter { private static final ThreadLocal<List<Runnable>> RUNNABLE = new ThreadLocal<>();
// 为提交后执行注册一个新的 runnable public void execute(Runnable runnable) { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<Runnable> runnables = RUNNABLE.get(); if (runnables == null) { runnables = new ArrayList<>(); RUNNABLE.set(runnables); TransactionSynchronizationManager.registerSynchronization(this); } return; } // 如果事务同步未激活 runnable.run(); }
@Override public void afterCommit() { List<Runnable> runnables = RUNNABLE.get(); runnables.forEach(Runnable::run); }
@Override public void afterCompletion(int status) { RUNNABLE.remove(); } }
|
如果事务处于活动状态,execute方法是注册在ThreadLoca的runnables的方法,它只是继续并执行runnables。ThreadLocal里面的afterCommit运行所有的runnables
#3 使用 around 建议连接适配器和注释
为了让PostCommitAdapter的execute方法与@PostCommit注释挂钩,围绕@PostCommit创建的一个advice,每一个连接点封装runnables的执行方法,并调用PostCommitAdapter内部行execute方法:
@Aspect @Slf4j @AllArgsConstructor @Configuration public class PostCommitAnnotationAspect { private final PostCommitAdapter postCommitAdapter;
@Pointcut("@annotation(com...<package>..PostCommit)") private void postCommitPointCut(){}
@Around("postCommitPointCut()") public Object aroundAdvice(final ProceedingJoinPoint pjp) { postCommitAdapter.execute(new PjpAfterCommitRunnable(pjp)); return null; }
private static final class PjpAfterCommitRunnable implements Runnable { private final ProceedingJoinPoint pjp;
public PjpAfterCommitRunnable(ProceedingJoinPoint pjp) { this.pjp = pjp; }
@Override public void run() { try { pjp.proceed(); } catch (Throwable e) { throw new RuntimeException(e); } } } }
|
用法
一旦编写了样板,用法就很简单了,无论应该在事务提交后执行哪种方法,都必须简单地用注释对其进行PostCommit注释。
示例:考虑具有 PostCommit 注释方法的两个类 A 和 B
Class A { @PostCommit void log(){ log.info("log from class A") } } Class B { @PostCommit void log(){ log.info("log from class B") } }
|
一个驱动类调用这些方法:
Class mainClass { @Transactional void transactionalMethod(Entity entity){ someOperation(entity) log.info("inside transaction"); a.log(); b.log(); save(entity); log.info("end of method"); } }
|
执行后输出:
> inside transaction > ** saving entity > log from class A > log from Class B
|