使用Spring实现反应式事务(Reactive Transactions)


本文探讨如何使用RDBC2或MongoDB来使用Spring Reactive的事务支持。

在还没有加入响应式/反应式事务集成之间,Spring认为没有必须进行Reactive事务管理,因此,Spring Framework不支持Reactive @Transaction。

随着时间的推移,MongoDB开始支持MongoDB Server 4.0的多文档事务,R2DBC(反应式SQL数据库驱动程序的规范)开始出现,最终在Template API中提供inTransaction(…) 方法作为执行原生本级事务的工作单元。

虽然将inTransaction(…)方法用于较小的工作块很方便,但它并不反映Spring支持事务的方式。在使用命令式编程模型时,Spring Framework允许两种事务管理安排:@Transactional和TransactionTemplate(声明性的各自的程序化事务管理)。

这两种事务管理方法都建立在PlatformTransactionManager管理事务资源事务的基础之上。PlatformTransactionManager可以是Spring提供的事务管理器实现,也可以是基于JTA的Java EE实现。

两种方法的共同之处在于它们将事务状态绑定到ThreadLocal存储,这允许事务状态管理而不传递TransactionStatus对象。事务管理应该在后台以非侵入方式进行。因为我们没有让线程继续在事务中继续有作用工作的设想,因此ThreadLocal只在命令式编程中工作。

命令式编程事务管理工作机制
事务管理需要将其事务状态与执行相关联。在命令式编程中,这通常是ThreadLocal存储 - 事务状态被绑定到一个线程,假设前提是事务代码在容器调用它的同一个线程上执行。

反应式编程模型消除了命令式(同步/阻塞)编程模型的这一基本假设。仔细研究反应式执行情况,你会发现代码在不同的线程上执行。使用进程间通信时,这会更加明显。我们再也不能安全地假设我们的代码在同一个线程上完全执行了。

这种变化使的依赖ThreadLocal的事务管理实现无效。

我们需要一种不同的安排来反映事务状态,而不是一直传递一个TransactionStatus对象。

关联带外数据并不是反应空间中的新要求。我们在其他领域遇到过这种要求,例如SecurityContextSpring Security for reactive方法安全性(仅举一例)。Project Reactor是Spring自身构建其响应支持的反应库,自3.1版本开始就为订阅者的上下文提供了支持。

Reactor Context是替代ThreadLocal命令式编程的反应式编程,上下文允许将上下文数据绑定到特定的执行。对于反应式编程,这是一个Subscription。

Reactor Context允许Spring将事务状态以及所有资源和同步绑定到特定的Subscription状态。使用Project Reactor的所有反应代码现在都可以参与响应式事务。

反应性事务管理
从Spring Framework 5.2 M2开始,Spring通过ReactiveTransactionManagerSPI 支持响应式/反应式事务管理。

ReactiveTransactionManager是使用事务资源的反应式和非阻塞集成的事务管理抽象。它是一个会返回Publisher的反应式@Transactional方法元注解,使用TransactionalOperator实现可编程的事务管理。

两个反应式事务管理器实现是:

  • R2DBC通过Spring Data R2DBC 1.0 M2
  • MongoDB通过Spring Data MongoDB 2.2 M4

让我们来看看反应式式事务的样子:

class TransactionalService {

  final DatabaseClient db

  TransactionalService(DatabaseClient db) {
    this.db = db;
  }

  @Transactional
  Mono<Void> insertRows() {

    return db.execute()
      .sql("INSERT INTO person (name, age) VALUES('Joe', 34)")
      .fetch().rowsUpdated()
      .then(db.execute().sql(
"INSERT INTO contacts (name) VALUES('Joe')")
      .then();
  }
}

反应事务看起来非常类似于注释驱动中的命令事务,主要的区别在于我们使用DatabaseClient,这是一个反应性资源抽象。所有事务管理都在幕后进行,利用Spring的事务拦截器和ReactiveTransactionManager。

Spring基于方法返回类型分辨要应用的事务管理类型:

  • 方法返回一个Publisher类型:响应式事务管理
  • 所有其他return类型:传统的命令式事务管理

这种区别很重要,因为您仍然可以使用命令式组件,例如JPA或JDBC查询,如果将这些查询结果包装成一个Publisher类型,Spring将应用反应而不是命令式事务管理,反应式事务管理就不会打开ThreadLocal中绑定的JPA或JDBC所需的事务。

TransactionalOperator
下一步,让我们看一下编程化事务管理TransactionalOperator:

ConnectionFactory factory = …
ReactiveTransactionManager tm = new R2dbcTransactionManager(factory);
DatabaseClient db = DatabaseClient.create(factory);

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomicOperation = db.execute()
  .sql("INSERT INTO person (name, age) VALUES('joe', 'Joe')")
  .fetch().rowsUpdated()
  .then(db.execute()
    .sql(
"INSERT INTO contacts (name) VALUES('Joe')")
    .then())
  .as(rxtx::transactional);

上面的代码包含一些值得注意的组件:

  • R2dbcTransactionManager:这是R2DBC的反应式事务管理器ConnectionFactory。
  • DatabaseClient:客户端使用R2DBC驱动程序提供对SQL数据库的访问。
  • TransactionalOperator:此运算符将所有上游R2DBC发布者与事务上下文相关联。您可以使用操作员样式as(…::transactional)或回拨样式execute(txStatus -> …)。

订阅后会懒惰地反应式事务,operator启动事务,设置适当的隔离级别并将数据库连接与其订户上下文相关联。所有参与(上游)Publisher实例都使用一个上下文绑定事务连接。

Reactive-functional operator 链可以是线性的(通过使用单个Publisher)或非线性的(通过合并多个流)。Publisher使用operator风格样式时,反应式事务将会影响所有上游。要将事务范围限制为特定的Publishers 集合,请应用回调样式,如下所示:

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> outsideTransaction = db.execute()
  .sql("INSERT INTO person (name, age) VALUES('Jack', 31)")
  .then();

Mono<Void> insideTransaction = rxtx.execute(txStatus -> {
  return db.execute()
    .sql(
"INSERT INTO person (name, age) VALUES('Joe', 34)")
    .fetch().rowsUpdated()
    .then(db.execute()
      .sql(
"INSERT INTO contacts (name) VALUES('Joe Black')")
      .then());
  }).then();

Mono<Void> completion = outsideTransaction.then(insideTransaction);

在上面的示例中,事务管理仅限于在execute(…)里面订阅的Publisher实例。换句话说,事务是作用域的。execute(…)中Publisher实例参与事务,并且命名outsideTransaction的Publisher在事务之外执行其工作。

Spring Data MongoDB
R2DBC是Spring与反应式的集成之一。另一个事务集成是通过Spring Data MongoDB访问MongoDB,您可以使用反应式编程来参与多文档事务。

Spring Data MongoDB是使用ReactiveMongoTransactionManager,这是一个ReactiveTransactionManager实现。它创建会话并管理事务,以便在托管事务中执行的代码参与多文档事务。

以下示例显示了MongoDB的编程事务管理:

ReactiveTransactionManager tm 
  = new ReactiveMongoTransactionManager(databaseFactory);
ReactiveMongoTemplate template = …
template.setSessionSynchronization(ALWAYS);                                          

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomic = template.update(Step.class)
  .apply(Update.set("state", …))
  .then(template.insert(EventLog.class).one(new EventLog(…))
  .as(rxtx::transactional)
  .then();

上面的代码设置一个ReactiveTransactionManager并用于TransactionalOperator在单个事务中执行多个写操作。ReactiveMongoTemplate被配置为参与反应式交易。