使用Spring TransactionSynchronization执行事务后提交的调度方法 - Singh


这篇博客试图解释我们如何利用Spring的TransactionSynchronization来实现在事务提交后执行业务代码,以及如何使用 Spring AOP巧妙优雅实现的。
在spring中使用事务时,往往需要一些特定的方法在事务提交成功后才能运行,我们通过一个例子来理解这一点:-
假设一个系统中有两个微服务,第一个管理用户生命周期,我们称之为用户管理服务(UMS),另一个为用户准备订阅源(Feed Service)。
当新用户签约在UMS做一些验证,然后将一条消息推向create_feed的Apache Kafka的主题Topic。另一方面,Feed服务侦听有关create_feed主题的消息,然后调用 UMS 以获取用户的兴趣以准备Feed。
你看到这个流程有很大的问题吗!?如果注册过程中的任何后续方法失败 - 用户详细信息从未保存在系统中并且 Feed 服务的get user interest调用将会失败,该怎么办?当谈论多个微服务交互时,这个问题会被放大。
可能还有许多其他要求需要某些特定方法仅执行事务后提交,让我们看看如何优雅地解决这个问题!
 
解决方案
可以在TransactionSynchronizationAdapter的事务提交afterCommit方法后执行一段代码:

TransactionSynchronizationManager.registerSynchronization(
    new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            // code for publishing message to kafka
        }
});

虽然这是可行的,但它需要在任何使用它的地方添加大量样板代码,这根本不是解决问题的一种非常干净的方法。
让我们看看如何创建一个注解 ( @PostCommit) 并使用 Spring AOP 围绕通知从后台驱动这一切。
 
#1 构建注释

@Target(ElementType.METHOD) 
@Retention(RetentionPolicy.RUNTIME) 
public @interface PostCommit { 
}

这部分很简单,类似于在 Java 中创建任何其他注释
 
#2 构建 PostCommitAdapter
PostCommitAdapter 将提供两个功能
  1. 它将注册runnables到一个ThreadLocal
  2. 覆盖TransactionSynchronizationAdapter的AfterCommit,在事务提交时运行所有注册runnables

@Slf4j 
@Component 
public class PostCommitAdapter extends TransactionSynchronizationAdapter { 
    private static final ThreadLocal<List<Runnable>> RUNNABLE = new ThreadLocal<>(); 

    // 为提交后执行注册一个新的 runnable
     public void execute(Runnable runnable) { 
        if (TransactionSynchronizationManager.isSynchronizationActive()) { 
            List<Runnable> runnables = RUNNABLE.get(); 
            if (runnables == null) { 
                runnables = new ArrayList<>(); 
                RUNNABLE.set(runnables); 
                TransactionSynchronizationManager.registerSynchronization(this); 
            }
            return
        }
       
// 如果事务同步未激活
        runnable.run(); 
    } 

    @Override 
    public void afterCommit() { 
        List<Runnable> runnables = RUNNABLE.get(); 
        runnables.forEach(Runnable::run); 
    } 

    @Override 
    public void afterCompletion(int status) { 
        RUNNABLE.remove(); 
    } 
}

如果事务处于活动状态,execute方法是注册在ThreadLoca的runnables的方法,它只是继续并执行runnables。ThreadLocal里面的afterCommit运行所有的runnables
  
#3 使用 around 建议连接适配器和注释
为了让PostCommitAdapter的execute方法与@PostCommit注释挂钩,围绕@PostCommit创建的一个advice,每一个连接点封装runnables的执行方法,并调用PostCommitAdapter内部行execute方法:
@Aspect
@Slf4j
@AllArgsConstructor
@Configuration
public class PostCommitAnnotationAspect {
    private final PostCommitAdapter postCommitAdapter;

    @Pointcut("@annotation(com...<package>..PostCommit)")
    private void postCommitPointCut(){}


    @Around(
"postCommitPointCut()")
    public Object aroundAdvice(final ProceedingJoinPoint pjp) {
        postCommitAdapter.execute(new PjpAfterCommitRunnable(pjp));
        return null;
    }

    private static final class PjpAfterCommitRunnable implements Runnable {
        private final ProceedingJoinPoint pjp;

        public PjpAfterCommitRunnable(ProceedingJoinPoint pjp) {
            this.pjp = pjp;
        }

        @Override
        public void run() {
            try {
                pjp.proceed();
            } catch (Throwable e) {
                throw new RuntimeException(e);
            }
        }
    }
}


 
用法
一旦编写了样板,用法就很简单了,无论应该在事务提交后执行哪种方法,都必须简单地用注释对其进行PostCommit注释。
示例:考虑具有 PostCommit 注释方法的两个类 A 和 B

Class A {
   @PostCommit
   void log(){
      log.info("log from class A")
   }
}
Class B {
   @PostCommit
   void log(){
      log.info(
"log from class B")
   }
}

一个驱动类调用这些方法:
Class mainClass {
      @Transactional
      void transactionalMethod(Entity entity){
          someOperation(entity)
          log.info("inside transaction");
          a.log();
          b.log();
          save(entity);
          log.info(
"end of method");
      }
}

执行后输出:
> inside transaction
> ** saving entity
> log from class A
> log from Class B