如何实现跨Mysql、Redis和Mongo分布式事务? - dongfu


如何组合多个存储引擎合并组成分布式事务?
Mysql、Redis、Mongo都是非常火爆的存储,各有各的优势。在实际应用中,同时使用多个存储是很常见的,保证跨多个存储的数据一致性成为一种需求。
本文给出了一个跨多个存储引擎实现分布式事务的示例:Mysql、Redis 和 Mongo。此示例基于分布式事务框架https://github.com/dtm-labs/dtm,希望能帮助您解决跨微服务的数据一致性问题。
灵活组合多个存储引擎形成分布式事务的能力是DTM首先提出的,目前还没有其他分布式事务框架声明过这样的能力。

假设用户现在正在参与促销活动:他们有余额,充值话费,促销活动将赠送商城积分。余额存储在Mysql中,账单存储在Redis中,商城积分存储在Mongo中。由于推广时间有限,存在参与失败的可能,所以需要回滚支持。
对于上述问题场景,可以使用DTM的Saga事务,下面我们将详细讲解解决方案。

准备数据
第一步是准备数据。为了方便用户快速上手示例,我们在en.dtm.pub中准备了所有数据,包括 Mysql、Redis 和 Mongo,具体连接用户名和密码可以在dtm-labs/ dtm 示例

编写业务代码
先从最熟悉的存储引擎Mysql的业务代码说起:
以下代码使用 Golang:其他语言,如 C#、PHP、Java 可以在这里找到:[DTM SDKs]( https://en.dtm.pub/ref/sdk.html )

func SagaAdjustBalance(db dtmcli.DB, uid int, amount int) error { 
  _, err := dtmimp.DBExec(db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?" , amount, uid) 
  return err 
}

该代码主要执行用户在数据库中余额的调整。在我们的示例中,这部分代码不仅用于 Saga 的正向操作,还用于补偿操作,其中只需要传入一个负数进行补偿。
对于 Redis 和 Mongo,业务代码处理类似,只是增加或减少相应的余额。

如何确保幂等性?
对于 Saga 事务模式,当我们在子事务服务中出现临时故障时,将重试失败的操作。这种失败可能发生在子事务提交之前或之后,因此子事务操作需要是幂等的。
DTM 提供辅助表和辅助函数,帮助用户快速实现幂等性。对于Mysql,它会在业务数据库中创建一个辅助表barrier,当用户启动一个事务调整余额时,它会先插入Gid到barrier表中。如果存在重复行,则插入失败,然后跳过平衡调整,保证幂等性。使用辅助函数的代码如下:

app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { 
  return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error { 
    return SagaAdjustBalance (tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult) 
  }) 
}))


Mongo处理幂等性的方式与Mysql类似,不再赘述。

Redis对幂等性的处理与Mysql不同,主要是事务的原理不同。Redis 事务主要通过 Lua 的原子执行来保证。DTM 辅助函数将通过 Lua 脚本调整平衡。在调整余额之前,它会Gid在 Redis 中查询。如果Gid存在则跳过平衡调整;如果没有,它将记录Gid并执行平衡调整。用于辅助函数的代码如下:

app.POST(BusiAPI+"/SagaRedisTransOut", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { 
  return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), -reqFrom(c).Amount , 7*86400) 
}))

如何实现补偿回滚?
对于 Saga,我们也需要处理补偿操作,但补偿不是简单的反向调整,还有很多陷阱需要注意。

  • 一方面,补偿需要考虑幂等性,因为补偿中也存在上小节描述的失败和重试。
  • 另一方面,补偿也需要考虑“空补偿”,因为 Saga 的前向操作可能会返回失败,这可能发生在数据调整之前或之后。对于已经提交调整的失败,我们需要执行反向调整,但是对于没有提交调整的失败,我们需要跳过反向调整。

在DTM提供的helper函数中:

  • 一方面会根据forward操作插入的Gid判断补偿是否为空补偿,
  • 另一方面会再次插入Gid+'compensate'来判断是否补偿是重复操作。如果补偿操作正常,则对业务进行数据调整;如有空赔或重复赔,则跳过业务上的调整。

Mysql代码如下:

app.POST(BusiAPI+"/SagaBTransInCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { 
  return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error { 
    return SagaAdjustBalance (tx, TransInUID, -reqFrom(c).Amount, "") 
  }) 
}))

Redis 的代码如下。

app.POST(BusiAPI+"/SagaRedisTransOutCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { 
  return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), reqFrom(c).Amount, 7*86400) 
}))

补偿服务代码与前向操作的代码几乎相同,只是金额乘以-1。DTM 辅助函数会自动正确处理幂等性和空值补偿。

其他例外
在编写正向操作和补偿操作时,实际上还有一个例外,叫做“暂停”。当超时或重试次数达到配置的限制时,全局事务将回滚。正常情况是在补偿之前进行正向操作,但在“进程暂停”的情况下,可以在正向操作之前进行补偿。所以前向操作也需要判断是否已经执行了补偿,如果已经执行,也需要跳过数据调整。

对于 DTM 用户,这些异常已经得到了优雅和妥善的处理,作为用户的您只需要按照MustBarrierFromGin(c).Call上面描述的调用,根本不需要关心它们。DTM 处理这些异常的原理在这里详细描述:异常和子事务障碍

启动分布式事务
编写完各个子事务服务后,下面的代码代码会发起一个 Saga 全局事务。

saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, dtmcli.MustGenGid(dtmutil.DefaultHTTPServer)).
  Add(busi.Busi+"/SagaBTransOut", busi.Busi+"/SagaBTransOutCom", &busi.TransReq{Amount: 50}).
  Add(busi.Busi+"/SagaMongoTransIn", busi.Busi+"/SagaMongoTransInCom", &busi.TransReq{Amount: 30}).
  Add(busi.Busi+"/SagaRedisTransIn", busi.Busi+"/SagaRedisTransOutIn", &busi.TransReq{Amount: 20})
err := saga.Submit()

在这部分代码中,创建了一个Saga全局事务,由3个子交易组成。

  1. 从Mysql转出50个
  2. 转入30个到Mongo
  3. 转入20个到Redis

在整个交易过程中,如果所有的子交易都成功完成,那么全局交易就会成功;如果其中一个子交易返回业务失败,那么全局交易就会回滚。

运行
如果你想运行一个完整的上述例子,步骤如下。
1.、运行DTM

git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go

2.、运行一个成功的例子

git clone https://github.com/dtm-labs/dtm-examples && cd dtm-examples
go run main.go http_saga_multidb

3、运行一个失败的例子

git clone https://github.com/dtm-labs/dtm-examples && cd dtm-examples
go run main.go http_saga_multidb_rollback

你可以修改这个例子来模拟各种临时故障、空补偿情况以及其他各种例外情况,当整个全局事务完成后,数据是一致的。

总结
本文给出了一个跨越Mysql、Redis和Mongo的分布式事务的例子。它详细描述了需要处理的问题,以及解决方案。
本文的原则适用于所有支持ACID事务的存储引擎,你也可以快速扩展到其他引擎,如TiKV。
欢迎访问github.com/dtm-labs/dtm:它是一个专门的项目,让微服务中的分布式事务变得更容易。它支持多种语言,以及多种模式,如2段消息事务、Saga、Tcc和XA。