没有人知道事件溯源Event Sourcing是谁发明的。我无意中听说是汉谟拉比发明的。为什么?因为他规范了第一套会计规则。
事件溯源Event Sourcing(活动事件源)就像记账一样,我们记录每项业务活动(Event)作为一条新的条目(流水账)。毫不奇怪,会计模式同样适用于事件溯源。
例如,我们可以使用 "结账 "模式对生命周期流程进行有效建模。它的名称来自会计领域。所有财务数据都要进行汇总和核实,并在每个周期(如月和年)结束时创建最终报告。这些数据将作为下一周期的基础。
对于会计计算而言,不需要将整个历史数据进行结转;只需总结需要结转的关键部分,如期初余额等即可。
同样的模式也可用于时间建模。
结账(Closing the Books pattern)
结账事件溯源建模的本质。
正因为如此,我们可以保持简短,从而有效地运行我们的系统。我们对流程的生命周期进行切片,使用事件标记生命周期的开始和结束。
商店的收银员
我们可以尝试通过将特定收银机的所有交易保持在同一流上来对此进行建模,但如果我们正在为更大的百货商店构建一个系统,那么这种情况可能会迅速升级。我们最终可能会得到一个包含数千个事件的流。这很快就会使我们的处理效率低下且难以管理。
然而,如果我们与领域专家交谈,我们可能会意识到这不是我们业务的运作方式。所有付款均由特定收银员登记,收银员只关心本班发生的情况。他们不需要知道整个交易历史,只需要知道上一个班次留下的抽屉中的起始现金金额(称为浮动)。
让我们尝试在简化的事件模型中反映这一点:
public abstract record CashierShiftEvent { public record ShiftOpened( CashierShiftId CashierShiftId, string CashierId, decimal Float, DateTimeOffset StartedAt ): CashierShiftEvent;
public record TransactionRegistered( CashierShiftId CashierShiftId, string TransactionId, decimal Amount, DateTimeOffset RegisteredAt ): CashierShiftEvent;
public record ShiftClosed( CashierShiftId CashierShiftId, decimal DeclaredTender, decimal OverageAmount, decimal ShortageAmount, decimal FinalFloat, DateTimeOffset ClosedAt ): CashierShiftEvent;
private CashierShiftEvent(){} }
|
因此,我们有ShiftOpened和ShiftClosed事件标记收银班次的生命周期,并有TransactionRegistered事件来注册付款。当然,在轮班开始和结束之间可能会发生更多类型的事件,但让我们保持简单。
假设TransactionRegistered是您在整个轮班生命周期中可能遇到的事件的一个示例。
您可能会注意到CashierShiftId是一个类,而不是原始值。这是强类型键的示例:
public record CashierShiftId(string CashRegisterId, int ShiftNumber) { public static implicit operator string(CashierShiftId id) => id.ToString();
public override string ToString() => $"urn:cashier_shift:{CashRegisterId}:{ShiftNumber}"; }
|
我们的收银员班次CashierShiftId由两个组件组成:
我们使用统一资源名称结构 ( URN )。
urn:cashier_shift:{CashRegisterId}:{ShiftNumber}您可以使用任何格式,但 URN 是处理具有有意义段的 id 的标准化方法。如果我们有标准,为什么要重新发明轮子呢?我们的 URN 以前缀和流类型开头。然后,我们有收银机 ID 和班次号段。
如果我们想按时间方面对它进行切片,我们可以添加更多组件,例如日期。因此,每天都会重置轮班号码。最重要的是,我们正在定义收银轮班的明确生命周期。
默认情况下,使用Guid作为流键,但它允许使用 string。为此,我们需要在配置中进行更改:
services.AddMarten(options => { options.Events.StreamIdentity = StreamIdentity.AsString;
// (...) });
|
接下来,我们的收银班次可以建模如下:
public record CashierShift { public record NonExisting: CashierShift;
public record Opened( CashierShiftId ShiftId, decimal Float ): CashierShift;
public record Closed( CashierShiftId ShiftId, decimal FinalFloat ): CashierShift;
private CashierShift() { }
public string Id { get; init; } = default!; }
|
当没有班次、开放或关闭时,它要么不存在。它被修剪为仅包含决策所需的信息
我们可以根据事件构建它:
public record CashierShift { // (...)
public CashierShift Apply(CashierShiftEvent @event) => (this, @event) switch { (_, ShiftOpened shiftOpened) => new Opened(shiftOpened.CashierShiftId, shiftOpened.Float),
(Opened state, TransactionRegistered transactionRegistered) => state with { Float = state.Float + transactionRegistered.Amount },
(Opened state, ShiftClosed shiftClosed) => new Closed(state.ShiftId, shiftClosed.FinalFloat),
_ => this }; }
|
让我们添加最后的构建块:收银机设置。我们需要收银机来进行收银轮班:
public record CashRegister(string Id) { public static CashRegister Create(CashRegisterInitialized @event) => new(@event.CashRegisterId); }
public record CashRegisterInitialized( string CashRegisterId, DateTimeOffset InitializedAt );
public record InitializeCashRegister( string CashRegisterId, DateTimeOffset Now );
public static class CashRegisterDecider { public static object[] Decide(InitializeCashRegister command) => [new CashRegisterInitialized(command.CashRegisterId, command.Now)]; }
|
现在让我们定义流程中允许的一组方法。因此,对于事件来说,它将是:OpenShift、RegisterTransaction、CloseShift:
public abstract record CashierShiftCommand { public record OpenShift( string CashRegisterId, string CashierId, DateTimeOffset Now ): CashierShiftCommand;
public record RegisterTransaction( CashierShiftId CashierShiftId, string TransactionId, decimal Amount, DateTimeOffset Now ): CashierShiftCommand;
public record CloseShift( CashierShiftId CashierShiftId, decimal DeclaredTender, DateTimeOffset Now ): CashierShiftCommand;
private CashierShiftCommand(){} }
|
我们的决策器如下:
public static class CashierShiftDecider { public static object[] Decide(CashierShiftCommand command, CashierShift state) => (command, state) switch { (OpenShift open, NonExisting) => [ new ShiftOpened( new CashierShiftId(open.CashRegisterId, 1), open.CashierId, 0, open.Now ) ],
(OpenShift open, Closed closed) => [
new ShiftOpened( new CashierShiftId(open.CashRegisterId, closed.ShiftId.ShiftNumber + 1), open.CashierId, closed.FinalFloat, open.Now ) ],
(OpenShift, Opened) => [],
(RegisterTransaction registerTransaction, Opened openShift) => [ new TransactionRegistered( openShift.ShiftId, registerTransaction.TransactionId, registerTransaction.Amount, registerTransaction.Now ) ],
(CloseShift close, Opened openShift) => [ new ShiftClosed( openShift.ShiftId, close.DeclaredTender, close.DeclaredTender - openShift.Float, openShift.Float - close.DeclaredTender, openShift.Float, close.Now ) ], (CloseShift, Closed) => [],
_ => throw new InvalidOperationException($"Cannot run {command.GetType().Name} on {state.GetType().Name}") }; }
|
在这里使用了 C# 中新的模式匹配功能。如果您还不太感兴趣,我将传递状态和命令,并根据其类型,运行特定的业务逻辑。
- 根据OpenShift命令,当没有状态或增加最后一个班次编号时,我将返回ShiftOpened事件,班次编号等于0 ,
- 如果班次已经打开,我不会返回任何事件,使其变得无关紧要,因此我不会进行任何更改,也不会引发任何异常。
- 注册交易非常简单:添加一个新事件,
- 当尝试关闭一个已经关闭的班次时,我将其设置为幂等,就像打开一个已经打开的班次一样。
- 如果命令和状态的组合无效或意外,我只会抛出InvalidOperationException。模式匹配允许我节省一些 if 语句。
基本场景发生在这里:
(OpenShift open, Closed closed) => [
new ShiftOpened( new CashierShiftId(open.CashRegisterId, closed.ShiftId.ShiftNumber + 1), open.CashierId, closed.FinalFloat, open.Now ) ],
|
闭的班次与我们将打开的班次是不同的流。我们将其设置为:
new CashierShiftId( open.CashRegisterId, closed.ShiftId.ShiftNumber + 1 );
|
提醒一下,这也反映在我们的 id 结构中:urn:cashier_shift:{CashRegisterId}:{ShiftNumber}
关闭和打开班次作为一项操作
using CommandResult = (CashierShiftId StreamId, CashierShiftEvent[] Events);
public record CloseAndOpenCommand( CashierShiftId CashierShiftId, string CashierId, decimal DeclaredTender, DateTimeOffset Now );
public static class CloseAndOpenShift { public static (CommandResult, CommandResult) Handle(CloseAndOpenCommand command, CashierShift currentShift) { // close current shift var (currentShiftId, cashierId, declaredTender, now) = command; var closingResult = Decide(new CloseShift(currentShiftId, declaredTender, now), currentShift);
// get new current shift state by applying the result event(s) currentShift = closingResult.Aggregate(currentShift, (current, @event) => current.Apply(@event));
// open the next shift var openResult = Decide(new OpenShift(currentShiftId, cashierId, now), currentShift);
// 仔细检查是否真的打开过 var opened = openResult.OfType<ShiftOpened>().SingleOrDefault(); if (opened == null) throw new InvalidOperationException("Cannot open new shift!");
// 用各自的流 id 返回两个结果 return ((currentShiftId, closingResult), (opened.CashierShiftId, openResult)); } }
|
我们按顺序运行两个命令,返回两个操作的结果以及 id。
如果我们使用受益于 PostgreSQL 事务功能和Marten 内置工作单元。
public static class CloseAndOpenShift { public static async Task<CashierShiftId> CloseAndOpenCashierShift( this IDocumentSession documentSession, CloseAndOpenCommand command, int version, CancellationToken ct ) { var currentShift = await documentSession.Events.AggregateStreamAsync<CashierShift>(command.CashierShiftId, token: ct) ?? new CashierShift.NonExisting();
var (closingResult, openResult) = Handle(command, currentShift);
// Append Closing result to the old stream if (closingResult.Events.Length > 0) documentSession.Events.Append(closingResult.StreamId, version, closingResult.Events.AsEnumerable());
if (openResult.Events.Length > 0) documentSession.Events.StartStream<CashierShift>(openResult.StreamId, openResult.Events.AsEnumerable());
await documentSession.SaveChangesAsync(ct);
return openResult.StreamId; } }
|
由于 id 的可预测结构和调用StartStream方法,我们可以确保如果新班次已经打开,那么我们的操作将被拒绝。我们不会有任何重复的轮班。
我们可以在端点中使用此代码:app.MapPost("/api/cash-registers/{cashRegisterId}/cashier-shifts/{shiftNumber:int}/close-and-open", async ( IDocumentSession documentSession, string cashRegisterId, int shiftNumber, CloseAndOpenShiftRequest body, [FromIfMatchHeader] string eTag, CancellationToken ct ) => { var command = new CloseAndOpenCommand( new CashierShiftId(cashRegisterId, shiftNumber), body.CashierId, body.DeclaredTender, Now );
var openedCashierId = await documentSession.CloseAndOpenCashierShift(command, ToExpectedVersion(eTag), ct);
return Created( $"/api/cash-registers/{cashRegisterId}/cashier-shifts/{openedCashierId.ShiftNumber}", cashRegisterId ); } );
|
正如您所看到的,我们极大地受益于事件溯源和决策者模式的可重复性和可组合性以及 Marten 的事务功能。
我们还使用ETag 的乐观并发来确保我们不会遇到并发问题。因此,我们会知道我们正在根据预期状态做出决策。
关闭和开放轮班作为单独的操作
如果我们的生命周期是连续的,那么将关闭和打开作为一项操作的模式可能是合适的。然而,通常情况下,它们并非如此。收银员占用收银机的情况取决于客流量。有时可能没有人在特定的收银机上工作。怎么处理呢?
让我们从结尾开始,从结束开始,一步步解决这个问题。现在这将是一个简单的操作:
app.MapPost("/api/cash-registers/{cashRegisterId}/cashier-shifts/{shiftNumber:int}/close", ( IDocumentSession documentSession, string cashRegisterId, int shiftNumber, CloseShiftRequest body, [FromIfMatchHeader] string eTag, CancellationToken ct ) => { var cashierShiftId = new CashierShiftId(cashRegisterId, shiftNumber);
return documentSession.GetAndUpdate<CashierShift, CashierShiftEvent>(cashierShiftId, ToExpectedVersion(eTag), state => Decide(new CloseShift(cashierShiftId, body.DeclaredTender, Now), state), ct); } );
|
我在这里使用一个简单的包装器,它将从流中加载事件,从中构建状态,运行业务逻辑并附加事件结果事件(如果有):
public static class DocumentSessionExtensions { public static Task GetAndUpdate<T, TEvent>( this IDocumentSession documentSession, string id, int version, Func<T, TEvent[]> handle, CancellationToken ct ) where T : class => documentSession.Events.WriteToAggregate<T>(id, version, stream => { var result = handle(stream.Aggregate); if (result.Length != 0) stream.AppendMany(result.Cast<object>().ToArray()); }, ct); }
|
打开班次会比较复杂
我们需要获取上次关闭的班次编号和数据(例如float,即上次班次后抽屉中现金的状态)。当然,收银员可以提供之前的班次号码,但这很容易出错并且存在潜在的漏洞。如果我们自己跟踪的话会更好。最好的方法是构建一个可以缓存信息的模型。我们可以根据注册的事件定义更新的投影。
这样的模型应该包含什么?它有可能缓存打开新班次所需的所有信息,例如:
public record CashierShiftTracker( string Id, // Cash Register Id string LastClosedShiftNumber, string LastClosedShiftFloat, // (...) etc. );
|
看起来不错,因为我们可以从查询它并从中获取所需的信息开始。然而,在我看来,该解决方案不可扩展,并且在维护时可能会带来一些问题。我们的流程可能会发生变化,并且每次影响关闭或打开时我们都需要更新此预测。这可能会在模型版本控制、更新等方面产生问题。
结账模式总结
- 存储审计和下一个生命周期所需的已关闭流的摘要,
- 打开时,从关闭的生命周期中的最后一个事件获取数据,并使用它来启动新的周期和流。
保持流简短是事件溯源中最重要的建模实践。结账模式是实现这一目标的最大推动者。请参阅我的示例存储库中的完整代码。