使用microsaga库在Java中实现微服务Saga事务管理


Saga 模式将微服务之间的本地事务组合成所谓的“Saga”。这种模式的主要思想是,如果本地事务成功完成,下一个事务将按顺序运行。如果本地事务失败,必须启动一系列补偿动作,以取消所有先前事务的结果。

saga 模式通过管理本地事务的顺序运行及其补偿来处理长时间运行的事务。
让我们看一下有 3 个服务的情况:

  • OrderService——负责产品订单管理;
  • PaymentService — 负责银行之间的资金转账;帐户
  • DeliveryService — 负责订购产品的所有交付。

我们正在尝试实施购买产品的用例。为此,我们应该使用订单服务创建一个订单(1),其中包含产品属性及其数量的描述。然后我们应该处理付款(2),将钱从用户帐户转移到公司帐户。最后,我们应该要求快递公司在特定日期为特定城市计划交付(3)。

buyProduct: 
  OrderClient.createOrder // 1 
  PaymentClient.processPayment
// 2 
  DeliveryClient.planDevivy
// 3


但是,我们不应该期望只有快乐的一天。通常会发生一些技术或业务问题,从连接问题到业务逻辑验证。对于 saga 方法,我们需要描述对我们行为的补偿。补偿意味着一些将回滚先前本地事务的操作。在我们的例子中,它可以这样做:

  1. 取消特定产品的订单。
  2. 取消付款并将资金转回初始状态。
  3. 取消计划交货。

此外,我们需要管理正确的补偿顺序。如果PaymentClient.processPayment部分失败,我们必须调用 2 次补偿:

  1. PaymentClient.cancelPayment取消付款并解冻客户的资金。
  2. OrderClient.cancelOrder取消订单并使其可供其他潜在客户使用。

因此,我们将收到以下带有补偿的操作集:

OrderClient.createOrder              action #1
  OrderClient.cancelOrder              compensation #1
PaymentClient.process                action #2
  PaymentClient.cancelPayment           compensation #2.1
  OrderClient.cancelOrder               compensation #2.2
DeliveryClient.planDevivery          action #3
  DeliveryClient.cancelDelivery         compensation #3.1
  PaymentClient.cancelPayment           compensation #3.2
  OrderClient.cancelOrder               compensation #3.3

总的来说,我们可以看到很多重复的代码和难以阅读的代码,其中有很多潜在的错误空间。

借助microsaga库,我们将尝试以更简单的方式实现 saga。

实践中的 Microsaga 库
它是一个开源库,可以在github 存储库中找到,也可以在 maven Central 中找到。
maven依赖示例

<!-- https://mvnrepository.com/artifact/io.github.rmaiun/microsaga -->
<dependency>
  <groupId>io.github.rmaiun</groupId>
  <artifactId>microsaga</artifactId>
  <version>1.0.0</version>
</dependency>


我们的任务是编写 saga,它负责购买具有给定 API 的产品,其中:

  • 客户——为产品付费的用户
  • 产品——一些唯一的标识符
  • 城市——交货信息

public void buyProduct(String client, String product, String city) {
    // create order
   
// transfer money
   
// plan delivery   
}

让我们为订单创建创建一个Saga步骤,其中补偿将删除产品sagaId。
值得一提的是,该库会根据 UUID 为每个 saga 自动创建 id,并隐含地为操作和补偿提供它。
因此,对于以下订单服务 API:

class OrderService {
  public OrderCreatedDto createOrder(CreateOrderDto dto) {...}
  public void cancelOrder(String sagaId) {...}
}

我们可以做这个传奇步骤:
public SagaStep<OrderCreatedDto> createOrderSagaStep(String user, String product) { 
    var action = Sagas.action("createOrder"
        sagaId -> orderService.createOrder(new CreateOrderDto(user, product, sagaId))); 
    var compensation = Sagas.compensation(
"cancelOrder", orderService::cancelOrder); 
    return action.compensate(compensation); 
}

在这里可以看到:

  1. Saga 动作有名称和动作,可以是 Callable、Function 或 Runnable,用于表示 void 动作。它消耗sagaId,这是由 microsaga 库隐式传递的。我们将使用它来标记我们在数据库中的数据。
  2. sagaId补偿也有名称和执行块,在我们的例子中,如果操作失败,它们也会用于数据删除。
  3. 最后一条指令创建了一个易于阅读的传奇步骤,并将动作和补偿配对。

我们可以一起改写如下:

public SagaStep<OrderCreatedDto> createOrderSagaStep(String user, String product) {
  var createOrderDto = new CreateOrderDto(user, product, sagaId);
  return Sagas.action("createOrder", sagaId -> orderService.createOrder(createOrderDto))
    .compensate(
"cancelOrder", orderService::cancelOrder);
}

让我们为支付服务做同样的事情:

class PaymentService {
  public PaymentProcessedDto processPayment(ProcessPaymentDto dto) {...}
  public void cancelPayment(String sagaId) {...}
}

这个 saga 步骤的唯一区别是我们将使用可重试补偿。Failsafe库是 microsaga 库中唯一的依赖项,因此我们可以将其强大的 API 用于不同的重试策略。
结果,我们的 saga 步骤:

var retryPolicy = new RetryPolicy<>().withDelay(Duration.of(5L, ChronoUnit.SECONDS));
var paymentDto = new ProcessPaymentDto(client, company, price, orderId, sagaId);
var processPaymentSagaStep = Sagas.action("processPayment",  
        sagaId -> paymentService.processPayment(processPaymentSagaStep)) 
    .compensate(
"cancelPayment", paymentService::cancelPayment, retryPolicy);


送货服务是我们链条中的最后一个,所以我们将为它声明saga步骤而不进行补偿,因为如果送货的本地交易不成功,就没有必要进行任何补偿。我们不期望从它那里得到任何有用的信息,所以我们对它使用voidRetryableAction。请注意,对于无效动作,将返回NoResultinstance。
最后,我们将完成这个saga步骤:

class DeliveryService {
  public void planDelivery(PlanDeliveryDto dto) {...}
  
  public SagaStep<NoResult> planDeliverySagaStep(PaymentProcessedDto dto, String city) { 
    var dto = new PlanDeliveryDto(dto.payer(), city);
    var retryPolicy = new RetryPolicy<NoResult>().withMaxRetries(3);
    return Sagas.voidRetryableAction("planDelivery",
        () -> deliveryService.planDelivery(dto), retryPolicy) 
      .withoutCompensation(); 
  }
}

现在,是时候结合所有这些步骤,运行我们的Saga了。

public void buyProduct(String client, String product, String city) { 
    var saga = createOrderSagaStep(client, product)
        .zipWith(createPaymentSagaStep, (paymentDtoIn, paymentDtoOut) -> Pair.of(paymentDtoOut, city))    \\ (1) 
        .flatmap(pair -> planDeliverySagaStep(pair.getFirst(), pair.getSecond()));    \\ (2) 
    SagaManager.use(saga)    \\ (3) 
        .transact()    \\ (4) 
        .peek(evalResult -> logSaga(evalResult.getEvaluationHistory()))    \\ (5) 
        .valueOrThrow();    \\ (6) 
  }

public static void logSaga(EvaluationHistory eh) { 
    eh.getEvaluations().forEach(e -> LOG.info("SAGA:{}:[{}] {} ({}) {}", eh.getSagaId(), 
        e.getEvaluationType().name().toLowerCase(), e.getName(), e.getDuration(), e.isSuccess() ?
"ok" : "error")); 
  }


由多个步骤组成:

  1. zipWith的组合提供:为特定的saga步骤的in和out dtos进行dto转换。
  2. 通过flatmap组合saga步骤,允许第二步使用第一步结果中的数据作为输入参数。
  3. Saga是懒计算的,所以在我们描述完所有的步骤后,我们需要执行它们。最简单的方法是通过静态方法的使用调用具有默认行为的saga管理。
  4. transact方法将运行整个saga,并以正确的方式对不成功的情况应用一连串的补偿。
  5. EvaluationResult是Saga执行的结果,它包括一些关于Saga运行的最小信息。为了对一个给定的结果应用一个副作用,引入了peek方法。
  6. 计算评价结果的处理有几个选项。我们将使用API来调用值,如果saga成功完成或抛出错误。


结论
Saga 模式的实现需要复杂的方法,对于每个系统,可以根据技术堆栈、预算、要求等大量因素使用各种技术和技术。但是,如果您需要一个小而智能的解决方案来处理分布式事务,你会发现 microsaga 库很有吸引力。它仅使用一个依赖项,具有简单、可读的 API,并且可以从 java 8 开始使用。它的语法与 java 流和 Spring Reactor Mono/Flux 运算符非常相似,这为现在使用而不是保存它提供了额外的好处它以备不时之需。