微服务中的分布式事务:使用 Temporal 实现 Saga


微服务架构的一个重要设计原则是每个服务数据库模式。这种模式帮助我们保持服务松散耦合,以便它们可以独立开发、部署和扩展。换句话说,域数据被封装在微服务中,如果其他服务需要数据,则它们通过调用 API 来实现。

这一设计原则带来了一个有趣的挑战:在微服务中使用分布式事务协调写操作。

事务定义:
在数据库管理系统中,事务是单个逻辑或工作单元,有时由多个操作组成

这种分布式事务交易的一个例子是电子商务应用中的订单履行。一个订单履行的工作流程会导致对不同服务的写入,如订单服务、支付服务和运输服务。在分布式交易中,要么交易的所有步骤都需要成功,要么交易整体失败,也就是说,没有中间状态;例如,付款被扣除,但库存没有被保留。

在微服务应用中使用两阶段提交实现分布式事务可能不是一种选择,原因有很多,比如可扩展性问题、复杂性、服务数据库不支持等等。

那么,我们如何在微服务应用中实现事务呢?

答案是Saga!

什么是Saga?
Saga 是一系列本地事务,其中每个本地事务更新数据库并发布消息或事件以触发 Saga 中的下一个本地事务。如果本地事务失败,saga 将执行一系列 补偿事务,以撤消先前本地事务所做的更改。

在这里,“发布消息或事件”并不意味着 Saga 总是涉及发布/订阅或消息总线。正如我们很快就会看到的,有两种方法可以实现 Saga——编排和编排。Cholography 几乎总是涉及消息传递,而 Orchestration 则不需要。

从上图中可以看出,每个微服务都实现了一个对应业务事务步骤的补偿事务。

有两种常见的方法来实现 Saga——编排和编排。

编舞Choreography
在 Choreography 中,Saga 参与者在没有中央控制点的情况下交换消息。每个参与者在完成本地事务后发布领域事件,其他参与者对领域事件做出反应并执行他们的本地事务。

在失败的情况下,Saga 参与者发布补偿交易事件,其他参与者做出相应反应并撤销本地交易。

对于我们的示例用例,编排可能如下所示(为简洁起见未显示补偿):

编排Orchestration
在 Orchestration 中,一个集中的控制器(Orchestrator)告诉 Saga 参与者要执行什么本地事务。Saga Orchestrator管理 saga 请求,存储和解释每个步骤的状态,并通过补偿事务处理故障恢复。

对于我们的示例用例,编排可能如下所示:

在上图中,Order 服务充当中央控制器(Orchestrator)。

用Temporal实现 Saga
在微服务应用程序中实现 Saga 并不简单。此外,基于Choreography 的 Saga 比基于 Orchestration 的 Saga 实施起来要复杂得多。值得庆幸的是,像 Temporal 这样的工作流引擎极大地简化了 Saga 的实现。在 Temporal 中,Saga 被实现为 Orchestration。

在之前的文章Workflow Orchestration with Temporal and Spring Boot中,我们看到了分布式订单履行工作流(如下所示)在 Temporal 中的实现。


public void createOrder(OrderDTO orderDTO) {
  paymentActivity.debitPayment(orderDTO);
  reserveInventoryActivity.reserveInventory(orderDTO);
  shipGoodsActivity.shipGoods(orderDTO);
  orderActivity.completeOrder(orderDTO);
}

上面的工作流定义没有补偿事务步骤。因此,在发生故障时可能会出现数据一致性问题。例如,如果储备库存步骤出错,则付款应该反转,但事实并非如此。

要在 Temporal 中实现 Saga,我们需要做的就是将补偿事务定义为Activity并将它们与Saga对象关联为:

public void createOrder(OrderDTO orderDTO) {
    // Configure SAGA to run compensation activities in parallel
    Saga.Options sagaOptions = new Saga.Options.Builder().setParallelCompensation(true).build();
    Saga saga = new Saga(sagaOptions);
    try {
      paymentActivities.debitPayment(orderDTO);
      saga.addCompensation(paymentActivities::reversePayment, orderDTO);
      //Inventory
      inventoryActivities.reserveInventory(orderDTO);
      saga.addCompensation(inventoryActivities::releaseInventory, orderDTO);
      //Shipping
      shippingActivities.shipGoods(orderDTO);
      saga.addCompensation(shippingActivities::cancelShipment, orderDTO);
      //Order
      orderActivities.completeOrder(orderDTO);
      saga.addCompensation(orderActivities::failOrder, orderDTO);
    } catch (ActivityFailure cause) {
      saga.compensate();
      throw cause;
    }
  }

如果出现错误,Temporal 会在引发错误的Activity步骤之前运行所有补偿事务。
例如,如果步骤的执行shippingActivities.shipGoods(orderDTO)导致错误,则 Temporal 将该错误传播到 Workflow 实现OrderFulfillmentWorkflowImpl。
错误在 catch 块中被捕获,然后saga.compensate()被调用,它开始补偿并运行所有先前注册的补偿。
抛出错误throw cause会导致工作流失败。

我们可以使用 Temporal 客户端 SDK API 查询工作流状态或分析 UI 中的错误堆栈跟踪。

在我们的代码示例中,我们根据配置参数 simulate_error=true,在ShipmentServiceImpl类中通过抛出域ServiceException来模拟错误情况。


@Override
public String shipGoods(Double quantity) {
  // Simulate Error condition
  if (applicationProperties.isSimulateError() ) {
    log.error("Error occurred while shipping..");
    throw new ServiceException("Error executing Service");
  }
  UUID uuid = UUID.randomUUID();
  Thread.sleep(2000);
  return uuid.toString();
}

ShipmentServiceImpl是由ShippingActivitiesImpl调用的:

public void shipGoods(OrderDTO orderDTO) {
  log.info("Dispatching shipment,  order id {}", orderDTO.getOrderId());
  var trackingId = shipmentService.shipGoods(orderDTO.getQuantity());

  var shipment =
      Shipment.builder()
          .orderId(orderDTO.getOrderId())
          .productId(orderDTO.getProductId())
          .quantity(orderDTO.getQuantity())
          .trackingId(trackingId)
          .build();
  shipmentRepository.save(shipment);

  log.info("Created shipment for order id {}", orderDTO.getOrderId());
}

在我们补偿交易事务的简单实现中,我们只是改变域对象付款和库存的状态。例如,PaymentActivities补偿reversePayment实现为:

@Override
public void reversePayment(OrderDTO orderDTO) {
  log.info("Reversing payment for order {}", orderDTO.getOrderId());
  var payment =
      paymentRepository
          .getByOrderId(orderDTO.getOrderId())
          .orElseThrow(() -> new ResourceNotFoundException("Order id not found"));
  payment.setPaymentStatus(PaymentStatus.REVERSED);
  paymentRepository.save(payment);
}

显然,这种天真的实现不适用于生产用例。在生产代码中,我们可能希望调用外部支付服务来撤销支付,然后改变域对象的状态并持久化。

代码示例
您可以在GitHub 上找到本文的工作代码示例 . 要运行该示例,请克隆存储库,然后将 order-fulfilment-saga 作为项目导入您最喜欢的 IDE 中作为 Gradle 项目。

在此代码示例中,我们使用 Spring Boot 实现了微服务,并使用 Temporal Java 客户端 SDK 将它们与 Temporal 集成。

代码结构:

您可以在README.md中找到有关代码的更多信息。
您可以查看之前的文章Workflow Orchestration with Temporal and Spring Boot以了解有关如何在 Temporal 中实现工作流的更多信息。