Temporal让Saga模式变得简单


如果你想知道Saga模式是否适合你的场景,问问你自己:你的逻辑是否涉及多个步骤,其中一些步骤跨越机器、服务、分片或数据库,对于这些步骤,部分执行是不可取的?

事实证明,这正是sagas的用武之地。

也许你正在检查库存,向用户的信用卡收费,然后履行订单。也许你正在管理一个供应链。

Saga模式是有帮助的,因为它基本上是作为一个存储程序进度的状态机,防止多次信用卡收费,必要时进行恢复,并确切地知道在断电的情况下如何安全地恢复到一个一致的状态。

一个常见的基于生活的例子用来解释Saga模式对失败的补偿方式,就是旅行计划:
假设你很想去西雅图的杜瓦米什地区淋雨,你需要购买一张飞机票,预订一家酒店,并在雷尼尔山获得一张有向导的背包旅行的门票。

这三项任务都是耦合的:

  1. 如果你无法购买飞机票,就没有理由去买其他的东西。
  2. 如果你买到了飞机票,但没地方住,你就会想取消那个飞机预订(或重新尝试酒店预订或找其他地方住)。
  3. 最后,如果你不能预订那个背包旅行,真的没有其他理由来西雅图,所以你不妨取消整个事情。(开玩笑!)。

在现实世界中,有许多 "要么做,要么不做 "的软件应用:

  • 如果你成功地向用户收费,但你的履行服务报告说该物品已经缺货,
  • 如果你不退还费用,你将会有不高兴的用户。
  • 如果你遇到相反的问题,意外地 "免费 "交付物品,你就会被淘汰。
  • 如果协调机器学习数据处理管道的机器崩溃了,但后续机器继续处理数据,却无处报告他们的数据,你可能会有一个非常昂贵的计算资源账单。

在所有这些情况下,有某种 "进度跟踪 "和补偿代码来处理这些 "做完或不做任何事情 "的任务,正是Saga模式所提供的。

在Saga的说法中,这些 "全做或不做 "的任务被称为长期运行的事务。这并不一定意味着这类行动要运行 "很长时间",只是它们在逻辑时间上比在本地运行的与单个数据库交互的东西需要更多步骤。


如何构建一个Saga?
一个Saga是由两部分组成的:

  • 如果你需要 "撤消 "某些事情,定义好的 "往回走 "的行为(即补偿)。
  • 努力向前迈进的行为(即保存状态,以便在面临失败时知道从哪里恢复)。

补偿只是Saga设计模式的一半。本质上是整个系统的状态管理。补偿行动模式可以帮助你知道,如果一个单独的步骤(或者用Temporal术语说,一个活动)失败了,该如何恢复。但如果整个系统瘫痪了呢?你从哪里开始恢复?由于不是每一个步骤都可能有附带的补偿,你将被迫根据存储的补偿来做最好的猜测。Saga模式记录了你目前所处的位置,这样你就可以继续朝前推进了。

那么,我如何在自己的代码中实现Saga?
这是个有点棘手的问题,但是通过用Temporal运行你的代码,你会自动得到你的状态保存,并在任何级别的失败中重试。这意味着使用Temporal的Saga模式就像编码一样简单,当一个步骤(活动)失败时,你希望采取补偿。结束了。

这个魔法背后的原因是,Temporal在设计上自动跟踪你的程序的进展,并且在面对灾难性的失败时,可以从它离开的地方继续进行。此外,Temporal会在失败时重试活动,除了指定重试策略外,你不需要添加任何代码,例如:

RetryOptions retryoptions = RetryOptions.newBuilder()
       .setInitialInterval(Duration.ofSeconds(1))
       .setMaximumInterval(Duration.ofSeconds(100))
       .setBackoffCoefficient(2)
       .setMaximumAttempts(500).build();


因此,为了表达我的程序的高级逻辑,包括假期预订步骤和我希望在失败时采取的补偿措施,它的伪代码如下:

try:
   registerCompensationInCaseOfFailure(cancelHotel)
   bookHotel
   registerCompensationInCaseOfFailure(cancelFlight)
   bookFlight
   registerCompensationInCaseOfFailure(cancelExcursion)
   bookExcursion
catch:
   run all compensation activities

在Java中,Saga类为你记录了补偿的情况:

@Override
public void bookVacation(BookingInfo info) {
   Saga saga = new Saga(new Saga.Options.Builder().build());
   try {
       saga.addCompensation(activities::cancelHotel, info.getClientId());
       activities.bookHotel(info);

       saga.addCompensation(activities::cancelFlight, info.getClientId());
       activities.bookFlight(info);

       saga.addCompensation(activities::cancelExcursion, 
                            info.getClientId());
       activities.bookExcursion(info);
   } catch (TemporalFailure e) {
       saga.compensate();
       throw e;
   }
}

在其他语言的SDK中,你可以很容易地自己编写addCompensation和 compensate函数。这里有一个Go语言的版本:

func (s *Compensations) AddCompensation(activity any, parameters ...any) {
    s.compensations = append(s.compensations, activity)
    s.arguments = append(s.arguments, parameters)
}

func (s Compensations) Compensate(ctx workflow.Context, inParallel bool) {
    if !inParallel {
        // Compensate in Last-In-First-Out order, to undo in the reverse order that activies were applied.
        for i := len(s.compensations) - 1; i >= 0; i-- {
            errCompensation := workflow.ExecuteActivity(ctx, s.compensations[i], s.arguments[i]...).Get(ctx, nil)
            if errCompensation != nil {
                workflow.GetLogger(ctx).Error(
"Executing compensation failed", "Error", errCompensation)
            }
        }
    } else {
        selector := workflow.NewSelector(ctx)
        for i := 0; i < len(s.compensations); i++ {
            execution := workflow.ExecuteActivity(ctx, s.compensations[i], s.arguments[i]...)
            selector.AddFuture(execution, func(f workflow.Future) {
                if errCompensation := f.Get(ctx, nil); errCompensation != nil {
                    workflow.GetLogger(ctx).Error(
"Executing compensation failed", "Error", errCompensation)
                }
            })
        }
        for range s.compensations {
            selector.Select(ctx)
        }
    }
}

高水平的Go代码的步骤和补偿将看起来与Java版本非常相似:

func TripPlanningWorkflow(ctx workflow.Context, info BookingInfo) (err error) {
   options := workflow.ActivityOptions{
       StartToCloseTimeout: time.Second * 5,
       RetryPolicy:         &temporal.RetryPolicy{MaximumAttempts: 2},
   }

   ctx = workflow.WithActivityOptions(ctx, options)

   var compensations Compensations

   defer func() {
       if err != nil {
           // activity failed, and workflow context is canceled
           disconnectedCtx, _ := workflow.NewDisconnectedContext(ctx)
           compensations.Compensate(disconnectedCtx, true)
       }
   }()

   compensations.AddCompensation(CancelHotel)
   err = workflow.ExecuteActivity(ctx, BookHotel, info).Get(ctx, nil)
   if err != nil {
       return err
   }

   compensations.AddCompensation(CancelFlight)
   err = workflow.ExecuteActivity(ctx, BookFlight, info).Get(ctx, nil)
   if err != nil {
       return err
   }

   compensations.AddCompensation(CancelExcursion)
   err = workflow.ExecuteActivity(ctx, BookExcursion, info).Get(ctx, nil)
   if err != nil {
       return err
   }

   return err
}

上面这个高层次的代码序列被称为 "Temporal Workflow"。而且,如前所述,通过与Temporal一起运行,我们不必担心实现任何bookkeeping 记录跟踪,例如或通过event sourcing事件源跟踪我们的进展,或添加重试和重启逻辑,因为这些都是免费的。

因此,当编写与Temporal一起运行的代码时,你只需要担心编写补偿的问题,其余的都是免费提供的。

幂等idempotent
好吧,好吧,还有第二件事需要 "担心"。正如你可能记得的,Saga由两部分组成,第一部分是我们之前编码的那些补偿。第二部分,"努力向前迈进 "涉及在面对失败时可能重新尝试一项活动。

让我们来探讨一下其中的一个步骤,好吗?

Temporal完成了重试和跟踪整体进度的所有繁重工作,然而由于代码可以重试,你,即程序员,需要确保每个Temporal活动都是幂等的。这意味着bookFlight无论它被调用一次还是多次,它的观察结果是相同的。

为了更具体一点,一个设置某个字段foo=3的函数是幂等的,因为无论你调用多少次,之后foo都是3。而函数foo += 3则不是幂等,因为foo的值取决于你的函数被调用的次数。

非幂等有时看起来更微妙:如果你有一个允许重复记录的数据库,一个调用INSERT INTO foo (bar) VALUES (3)的函数会轻率地在你的表中创建尽可能多的记录,因此不是幂等idempotent。发送电子邮件或转账的函数的天真实现,默认也不是幂等的。

如果你现在正慢慢地开始担心,因为你的真实世界应用程序做了很多比set foo=3更复杂的事情,请注意。有一个解决方案:

你可以使用一个独特的标识符,称为idempotency key,或有时称为referenceId或类似的东西来唯一地识别一个特定的交易,并确保酒店预订交易有效地发生一次。这个idempotency key的方式可以根据你的应用需求来定义。在旅行计划应用程序中,BookingInfo中的clientId字段被用来唯一地识别交易。

type BookingInfo struct {
   Name     string
   ClientId string
   Address  string
   CcInfo   CreditCardInfo
   Start    date.Date
   End      date.Date
}


然而,使用clientId作为我们的键,限制了一个特定的人同时预订一个以上的假期。这可能就是我们想要的。

然而,一些商业应用可能会选择通过结合clientId和workflowId来建立一个idempotency key,以允许每个客户一次预订一个以上的假期。如果你想要一个真正唯一的idempotency key,你可以向工作流程传递一个UUID。你可以根据你的应用程序的需要来选择。

许多处理货币的第三方API已经为这个目的接受了idempotency键。如果你需要自己实现这样的东西,使用原子写来记录你目前已经看到的idempotency key,如果一个操作的idempotency key在 "已经看到 "的集合中,就不要执行这个操作。

效益与复杂度
Saga模式确实会增加你的代码的复杂性,所以不要因为你有微服务就在你的代码中实现它,这很重要。

然而,如果你需要完成一项涉及多个服务的任务(如预订机票和酒店的旅行),并且部分执行实际上并不成功,那么Saga将是你的朋友。

此外,如果你发现你的saga变得特别笨重,可能是时候重新考虑你的微服务是如何划分的,并卷起老袖子重构。

总的来说,Temporal使得在你的代码中实现传奇模式变得相对简单,因为你只需要编写每个步骤所需的补偿。

代码案例:GitHub