使用数据库实现状态机


大多数人都熟悉状态机并知道它们的价值。一般状态机库可以帮助您对状态进行建模,防止无效转换,并生成图表以帮助非技术人员理解代码的行为方式。

本文并不是要阐述状态机的情况。这是关于如何采用状态机的概念并使其与数据库模型一起工作,利用关系数据库(例如 Postgres 或 MySQL)来帮助您构建并发安全且高效的软件。

我在2015年加入GoCardless时第一次遇到这种模式。处理银行支付是一件需要多天时间的事情,而且状态性极强,因此团队最终构建了一个名为Statesman的库,提供了一个由底层转换表驱动的状态机,这并不奇怪。

GoCardless的大多数关键流程都使用了Statesman,到我离开时,我们的过渡表已经超过了10B行。它已成为一种必不可少的工具,我认为它是一种强大的竞争优势。Statesman甚至还体现在外部,例如GoCardless公共应用程序接口(API)中的端点提供了丰富的审计跟踪。

因此,如果您使用 Ruby,就赶紧使用 Statesman 吧。不过,对于那些希望获得这些优势但不使用 Ruby 的人来说,只要了解过渡表、锁定和边缘情况的细微差别,就能在几个小时内用自己选择的语言实现一个小型库。

这就是我前几天在 incident.io 所做的事情。这里有一份指南,你也可以照着做。

转换表
首先我们要说的是,在大多数应用程序中,您希望捕获资源通过状态机进行的每个转换并将其存储以供以后分析。在某些情况下,您甚至可以考虑转换的历史记录来决定下一步要转换到哪个状态。

这就是为什么任何数据库状态机的第一个组件都将创建一个表来包含转换。

-- Assuming the payments table is already here.
create table payments (/* ... */);

create table payment_transitions (
  id text primary key default generate_ulid() not null,
  payment_id text not null references payments(id),
  to_state text not null,
  most_recent boolean not null,
  sort_key integer not null,
  created_at timestamptz not null default now(),
  updated_at timestamptz not null default now()
);

每个转换行状态:

  • 它所属的父资源 ( payment_id)
  • 此转换将进入的状态 ( to_state)
  • 此转换是否是最新的 ( most_recent)
  • 允许转换逻辑排序的序数 ( sort_key)

现在我们添加两个索引支持的唯一约束,以确保状态机的完整性。

create unique index idx_payment_transitions_by_parent_most_recent
on payment_transitions
using btree(payment_id, most_recent)
where most_recent;

create unique index idx_payment_transitions_by_parent_sort_key
on payment_transitions
using btree(payment_id, sort_key);

第一个确保我们只能对 most_recent任何付款进行一次转换,如果我们想明智地询问“这笔付款处于什么状态?”,这是一个明确的要求。第二个确保我们在 上没有重复 sort_key:不太重要,但对于确保所有转换都可以严格排序很有用。

用代码表达状态机
现在我们有了一个用于存储转换的表,我们需要构建一个可用于在代码库中表达状态机的库。

在我们在 Incident.io 使用的 Go 库中,我们首先将转换表表示为我们的 ORM 可以使用的域模型:

type PaymentTransition struct {
  // ID is the unique ID of this transition
  ID string `json:
"id" gorm:"type:text;primaryKey;default:generate_ulid()"`
 
// PaymentID is a reference to the parent resource
  PaymentID string `json:
"payment_id"`
 
// ToState is where this transition was to
  ToState PaymentState `json:
"to_state"`
 
// MostRecent is true when this transition is the most recent
  MostRecent bool `json:
"most_recent"`
 
// SortKey provides ordinality over transitions
  SortKey int `json:
"sort_key"`
 
// CreatedAt is set upon transition creation
  CreatedAt time.Time `json:
"created_at"`
 
// UpdatedAt is set whenever this transition is modified
  UpdatedAt time.Time `json:
"updated_at"`
}

func (PaymentTransition) Parent() Payment {
  return Payment{}
}

func (a PaymentTransition) State() PaymentState {
  return a.ToState
}

// ParentColumn tells our machine library what column refers
// to the parent resource.
func (PaymentTransition) ParentColumn() (structField, column string) {
  return
"PaymentID", "payment_id"
}

您会注意到PaymentState这是一个新类型,我们将其定义为 Go 枚举值:

type PaymentState string

const (
  PaymentStatePendingSubmission PaymentState = "pending_submission"
  PaymentStateSubmitted         PaymentState =
"submitted"
  PaymentStatePaid              PaymentState =
"paid"
  PaymentStateCancelled         PaymentState =
"cancelled"
)

最后,我们让父资源 ( Payment) 实现一个Machine()将所有这些联系在一起的方法,并告诉我们的库有效的转换路径:

func (p *Payment) Machine() machine.Machine[PaymentTransition, Payment, PaymentState] {
  return machine.New[PaymentTransition, Payment, PaymentState](
    p, machine.Configure(
      machine.From(PaymentStatePendingSubmission).To(
        PaymentStateSubmitted,
      ),
      machine.From(PaymentStateSubmitted).To(
        PaymentStatePaid, PaymentStateCancelled,
      ),
    ),
  )
}

machine.New 会创建一个配置结构,记录每个状态以及允许过渡到的状态。在我们的示例中,这意味着 pending_submission 允许过渡到已提交,但不允许过渡到已支付paid 或已取消cancelled。

package machine

type Config[State comparable] struct {
  states map[State][]State
}

执行转换
现在,我们的代码已经知道了转换建模所需的一切,我们可以像这样转换付款:

transition, err := payment.Machine().
  TransitionTo(domain.PaymentStatePendingSubmission).
  Execute(ctx, db)


非常简单,但这给我们带来了什么?为什么使用数据库比在内存中执行更好?

因为,即使有并发进程在过渡同一付款,也只有在我们的配置允许源 -> 目的地的情况下,才会创建过渡。这对大多数现代应用程序来说都非常有用,因为它们同时有许多进程在工作,但又希望开发人员编写的代码能像单进程系统一样。

要让 Execute() 提供这样的保证,你需要了解数据库的配置,并注意如何加锁。不过,如果你是在读取提交模式(默认)下运行 Postgres,那么你可以复制这段代码,并确保安全。

在 SQL 中,Execute() 的实际运行方式是这样的,函数看起来是这样的

begin;

-- (1)
update payment_transitions
set most_recent = 'f'
where most_recent
and payment_id = 'PM123'
returning *;

-- (2)
/* use previous result to validate transition from -> to */

-- (3)
insert into payment_transitions (
  payment_id, sort_key, most_recent, to_state
)
values (
  'PM123', 20
/* previous sort_key + 10 */, 't', 'submitted'
);

commit;

第一次更新 (1) 将锁定之前最近的转换,并阻止任何竞争事务(这意味着它们将停止,直到锁被释放)。
第一个获胜者将返回之前的转换,并可以检查是否允许从之前的状态移动到下一个所需的状态,如果不允许,则将回滚事务。在我上面分享的 Go 实现中,这意味着检查Config结构以确保这是允许的。
最后,如果转换有效,我们将为目标状态插入一个新的转换行,并从前一个转换行most_recent设置并提前,依靠唯一索引来捕获任何冲突。sort_key

这就是第一个尝试转换的成功事务的运行方式,并且任何后续事务都将遵循以下两条路径之一:
(a) 初始转换失败
如果第一个事务失败,那么我们回滚,释放锁,竞争事务获取它并继续进行,就像第一个事务从未发生过一样。
这简化为“重新开始”。

(b) 初始转换成功
如果第一个事务成功,更新将解锁,但从已提交的 Postgres 读取中返回零行。
在我们看来,这要么是(i)没有过渡,我们应该假设这将是该父项的初始条目,要么(ii)我们被竞争交易击败。

我们假装这是初始过渡,如下所示:

  • (i) 我们是正确的,并且我们成功创建了初始转换。
  • (ii) 我们将尝试创建另一个most_recent转换,但我们的插入将因most_recent唯一索引而失败。

符合人体工程学的 API
使用机器 API 的开发人员可以从并发保护中受益,而无需过多考虑。

当我们遇到冲突时,库会返回,ErrTransitionConflict并且我们有一个帮助程序,如果返回此错误,它会重试。这意味着您只需将代码包装在重试中即可“解决”彼此竞争的并发进程,否则这将需要有人仔细考虑他们的并发模型并将这种复杂性带入他们的正常应用程序代码中。

通过数据库中的所有转换,通过利用数据库索引查找状态 Y 中的所有 X 变得高效且轻松。API 和生成的 SQL 如下所示:

/*
select
   *
from payments
join
  payment_transitions
  on payments.id = payment_transitions.payment_id
  and most_recent = 't';
*/

payments, err := domain.NewQuerier[domain.Payment](db).
  Joins(machine.InState[domain.PaymentTransition, domain.Payment](db,
    domain.PaymentStatePendingSubmission,
  )).
  Find(ctx)

我们还允许TransitionTo接收在转换表上设置辅助字段的任意部分,以便您可以针对需要它们的转换跟踪关联的元数据。

一个示例是,每当您将付款移至已提交时,都会附加一个提交 ID,这将submission_id在生成的转换行上设置一列:

transition, err := payment.Machine().
  TransitionTo(domain.PaymentStatePendingSubmission).
  With(
    domain.PaymentTransitionBuilder.SubmissionID(payload.SubmissionID),
  )
  Execute(ctx, db)

总结
我在这篇文章中使用了银行业务的例子,但不要以为这仅限于支付。我们在 incident.io 使用这种方法:

  • 为事件更新提供动力,其中每个更新都有信息、严重程度、状态等,而 incident_updates 则是过渡表。
  • 跟踪警报源的状态,这样我们就能记录何时设置、激活或停用警报源以及发生原因。
  • 我们的提示功能可创建 "提示运行",并在工作过程中进行转换。

只要你按照我给出的说明操作,你就能继承并发保护功能,如果你尝试一下,我保证你很快就会意识到它们的价值。即使是当你的数据团队注意到建立模型是多么容易的时候,因为有比以前更丰富的数据可用,而且数据的排列也比以前一致。