函数化事件溯源的决策者模式 - thinkbeforecoding


决策者模式是一种思考随时间变化的系统的概念方式。应用层和域代码之间的概念接口。它具有在它们之间产生极低摩擦的优势。
 
六边形架构

              ┌───────────────────┐
              │  Clock    System  │ 
              │   │               │
 Actions      │   ▼               │    Outputs
              │  ┌─────────────┐  │
    ────────────►│  Subsystem  │───────────────►
              │  └─────────────┘  │
              │     ▲        │    │
              │     │        ▼    │
              │  ┌─────────────┐  │
              │  │    State    │  │
              │  └─────────────┘  │
              └───────────────────┘


输入接口可以被抽象化(HTTP API、消息队列、UI 事件、命令行参数、定时器或警报……)并且参数可以直接传递给子系统。输出交互也可以被抽象并隐藏在接口后面(出站 HTTP 调用、消息或通知发送......)。保存状态的状态也可以隐藏在接口后面,以保护子系统免受实现细节的影响。
子系统中的代码就只是由系统逻辑组成。我们通常所说的业务逻辑或域。我们称其余的代码为应用层。
这就是此描述了六边形架构,其中应用程序核心域通过端口接口与其环境松散耦合,实现为技术基础架构的适配器。
。。。
 
决策者模式
决策者模式在六边形基础上进化为:

         Command  ┌──────────────┐
        ─────────►│              │ Events
         State    │    decide    ├───────┐
        ┌────────►│              │       │
        │         └──────────────┘       │
        │         ┌──────────────┐ Event │
        │  State  │              │◄──────┘
        ├───◄─────┤    evolve    │ State
        │         │              │◄──────┐
        │         └──────────────┘       │
        └────────────────────────────────┘


当系统最初启动时,它处于初始状态。目前,还没有发生任何事情。
当第一个命令到达时,我们有一个命令和一个状态,我们可以将它们传递给决定函数。它将命令的结果作为事件列表返回,即响应输入命令而发生的事情。
如果输出事件列表为空,则什么也没有发生,不需要计算新状态,我们准备好下一个命令。出于至少两个原因,这不应经常发生。一个几乎什么都不做的系统并不是很有趣,可能在没有事件溯源的情况下实现……但是什么都不做也会使某些情况更难诊断。当命令用于某事但未导致任何更改时,了解原因可能会很有趣。不发出事件将使基础设施问题和主动决定不做任何事情之间的区别变得更难找到。是系统崩溃了,还是只是决定什么都不做?
仍然可以通过获取为诊断而持久化的命令和可以从过去的事件中重建的状态,将它们传递给决定函数并查看结果,但它会要求仔细的代码分析或创建一个新的测试这个具体数据。这会占用开发人员的时间,最好避免。
如果业务规则决定不做任何事情,建议将其明确化,并返回一个不会影响状态的事件。它将清楚地出现在生成的事件中,并且诊断将很容易。这可以由无法访问代码的支持团队完成。
如果因为我们已经处于预期状态而没有发生任何事情,我们就处于幂等命令的情况。在这种情况下,实际上最好不返回 Events。它避免了无用的重复事件使事件存储膨胀。
当 Event 列表包含一个 Event 时,我们可以调用带有当前 State 和这个 Event 的进化函数。它将返回新状态,用于下一个命令而不是初始状态。
有时,事件列表将包含多个事件。在这种情况下,我们将为每个传递先前计算状态的事件调用进化函数。
我们将迄今为止确定的七个元素的组合称为决策器:

  • 一个 Command 类型,表示可以提交给 Decider 的所有命令
  • 一个 Event 类型,表示可以由 Decider 产生的所有事件
  • 代表决策者所有可能状态的状态类型(可以只是所有事件的列表)
  • 初始状态是决策者在发生任何事情之前的状态
  • 一个决定函数,它接受一个命令和一个状态并返回一个事件列表
  • 一个进化函数,它接受一个状态和一个事件并返回一个新的状态
  • 一个 isTerminal 函数,它接受一个状态并返回一个布尔值

在 F# 中,这可以建模为:
type Decider<'c,'e,'s> = 
    { decide: 'c -> 's -> 'e list
      evolve: 's -> 'e -> 's
      initialState: 's
      isTerminal: 's -> bool }

 
在内存中运行
我们可以轻松地让决策程序在内存中的可变状态变量上运行:

module InMemory =
    let start (decider: Decider<'c,'e,'s>) =
        let mutable state = decider.initialState

        fun (command: 'c) ->
            let events = decider.decide command state
            state <- List.fold decider.evolve state events
            events


在数据库上运行
之前的实现不是持久化的,一旦关闭就会失去任何状态。我们可以像任何经典应用程序一样在数据库中持久化状态:

module WithPersistence =
    let start (decider: Decider<'c,'e,'s>) =
        fun id (command: 'c) ->
            // load state from database
            let state = Storage.loadState(id)
           
// this is the decision
            let events = decider.decide command state
           
// compute new state
            let newState = List.fold decider.evolve state events
           
// save state in database
            Storage.saveState(id, newState)
            events

此版本很简单,但如果状态可以同时修改,则可能很危险。如果操作从不并发,则此代码很简单而且非常好。
。。
 
在事件存储区运行
它也可以使用事件存储轻松实现:

module WithEventStore =
    let start (decider: Decider<'c,'e,'s>) =
        fun stream (command: 'c) ->
            // load all past events to compute current state
            let state = 
                EventStore.loadEvents(stream, 0)
                |> List.fold decider.evolve decider.initialState
           
// get events from the decision
            let events = decider.decide command state
           
// append events to stream
            EventStore.appendEvents(stream, events)
            events

此版本从每个命令的开头重新加载所有事件。这对于短流是完全可以接受的。由于事件通常很小,加载少于 100 个事件非常快并且折叠它们,几乎是实例(将其视为执行一些基本操作的 100 次迭代的循环)。
 
快照
一旦您有很多事件,重新加载所有内容可能会变得很长。然后可以定期保存状态以及产生该状态的流的版本。快照可以保存在键值存储中:

module WithSnapshots =
    let start (decider: Decider<'c,'e,'s>) =
        // load state using snapshot if any
        let loadState stream =
           
// load snapshot
            let snapVersion, snapState =
                Snapshots.tryLoadSnapshot(stream)
                   
// fallback to version 0 and initialState if not found
                |> Option.defaultValue (0, decider.initialState)

           
// load version and events after snapshot
            let version, events = 
                EventStoreWithVersion.loadEvents(stream, snapVersion)
           
// fold events after snapshot
            let state = List.fold decider.evolve snapState events
            version, state

        fun stream (command: 'c) ->
            let rec handle (version, state) =
               
// get events from the decision
                let events = decider.decide command state
               
// append events to stream
                match EventStoreWithVersion.tryAppendEvents(stream, version, events) with
                | Ok newVersion ->
                    if isTimeToSnapshot version then
                       
// it is time to save snapshot
                       
// compute state
                        let newState = List.fold decider.evolve state events
                       
// save it
                        Snapshots.saveSnapshot(stream, newVersion, newState)
                    events
                | Error(newVersion, newEvents) ->
                   
// there was a concurrent write
                   
// catchup missing events and retry
                    let newState = List.fold decider.evolve state newEvents
                    handle (newVersion, newState)


           
// load all past events to compute current state
           
// using snapshot if any
            let version, state = loadState stream
            handle (version, state)

 
结论
您可能已经注意到,我们为运行决策器而编写的所有基础设施代码与实际域代码完全无关。它可以运行一个简单的游戏或一个复杂的业务系统,它仍然会保持不变。
另一个有趣的点是决策程序可以以多种方式运行。纯粹在内存中,将状态存储在数据库或事件存储中。不需要更改域代码。这表明对基础设施的高度独立性。
原文点击标题