以事件源方式构建事件驱动系统 - Jayanth


首先了解业务逻辑可以建模为事件驱动的状态机:

  • 状态机——状态机是一种数学抽象,用于设计基于行为模型的算法。状态机读取一组输入并根据这些输入更改为不同的状态。
  • 状态— 状态是对等待执行转换的系统状态的描述。
  • 转换——转换是从一种状态到另一种状态的变化。转换是在满足条件或收到事件时执行的一组操作。
  • 事件——驱动状态变化的实体。
  • 事件驱动状态机——如果从一种状态到另一种状态的转换是由事件或消息触发的(不是基于消费/解析字符或基于条件),则状态机是事件驱动的。
  • 事件驱动系统——事件驱动架构是关于通过发布事件而不是进行(例如)RPC/CRUD 相互调用或操纵共享状态的组件通信。这是一种通信策略(尽管它通常依赖于消息在相当长的时间内持续存在直到它们被消耗)。

事件驱动系统的实现范式

范式 1 — 面向状态的对象模型或属性值模型:
事件驱动系统的实现可以通过仅捕获系统的当前状态来完成。这自然是基于面向模型状态的持久性,它只保留最新版本的实体状态。状态存储为属性值的字典或记录,属性是一等公民,只能通过允许的封装方法作为事件操作进行修改。

public class button {
    
      boolean state;
    
          
      public void buttonPressed(){
      
          state = !state;
      
      }    
      
      public void setState(boolean state){
        
          this.state = state;
    
      }
      
      public boolean getState(){
        
          return state;
    
      }
                
}

例如,按钮应用的状态,最初只被存储为一个属性值,即开或关,与 buttonPressed 的修改动作一起封装为实体关系模型或对象模型或属性值模型中的方法。这个系统仍然是事件驱动的,因为它对事件进行操作,但事件不是这个系统的第一等公民,而是实体按钮的当前属性值或状态,无论它是开还是关,都是第一等公民。buttonPressed的事件或封装方法没有被存储,而只是急切地应用于更新当前的状态,因此,不能被重放或穿越时间。

考虑另一个购物车服务的例子,购物车被表示为carts表中的一条记录,cart-items表在关系模型中表示 "101号购物车中有2条香蕉,价格为1美元一条 "的行。为了向购物车添加物品,我们向购物车项目表插入(或者更新,如果只是数量上的变化)行。当购物车被签出时,我们用json或其他序列化的formart发布购物车的表示及其内容,以便运输服务拥有购物车中的内容,并将carts.checked_out设置为TRUE。

这就是一个完全合理的事件驱动架构。购物车服务将有一个已签出的购物车传达给发货服务。然而,它不是以事件为源的,也就是说,我们不能从事件中重建数据库表,因为我们没有跟踪这些事件。

范式2--事件溯源:
事件驱动系统的实现也可以通过只捕获表现在系统中的事件来完成。这自然是基于事件源(使用事件日志)的持久化,其中每个状态/属性的突变被存储为一个单独的记录,称为事件。事件是持久化中的第一等公民,而状态和属性是事件的副作用。我们存储事件,并在查询解析器中懒散地应用它来推导出状态或属性。事件也可以被重放或穿越时间。

enum event {
       BUTTON_PRESSED
   
   }
   public class event_sourced_button {
       List < event > buttonEvents = new ArrayList < event > ();
       public void buttonPressed() {
           buttonEvents.add(event.BUTTON_PRESSED);
       }
       private boolean applyEvent(event e, boolean state) {
           if (e == event.BUTTON_PRESSED)
               state = !state;
           return state;
       }
       public boolean getState(boolean initialState) {
           boolean state = initialState;
           for (event e: buttonEvents)
               state = applyEvent(e, state);
           return state;
       }
   }

例如,按钮应用的事件源实现不会存储按钮的状态,而是存储按钮上的buttonPressed事件,并逐一应用这些事件来推导出当前状态。这个系统是事件驱动的,也是事件源的,因为它把事件作为这个系统的一等公民来操作,只存储它们,然后,从事件列表中得出实体按钮的当前属性值或状态,无论它是打开还是关闭。事件只是被懒惰地应用于评估当前状态,给定初始状态,因此,可以重放或时间追溯到某些时间或事件。

同样地,前面的购物车服务的例子也可以被做成事件源。它将事件存储在事件存储中(可以是为事件源的需要而设计的数据存储,也可以是以特定方式使用的关系型数据库或非关系型数据库(例如事件表))。
以下是购物车101的事件序列,可能被写入事件存储:

  1. AddItem { “banana”,
  2. AddItem { “banana”, $1.00 }
  3. AddItem { “apple”, $1.50 }
  4. AddItem { “banana”, $1.00 }
  5. RemoveItem { “apple” }
  6. DiscountApplied { Requirement { 2, “banana” }, -$0.10 }
  7. AddItem { “mango”, $2.00 }
  8. CheckedOut { Items { Item { “banana”, 2, $1.00 }, Item { “mango”, 1, $2.00 } }, Discount {“banana”, 2, -$0.10} }
    .00 }
  9. AddItem { “apple”,
  10. AddItem { “banana”, $1.00 }
  11. AddItem { “apple”, $1.50 }
  12. AddItem { “banana”, $1.00 }
  13. RemoveItem { “apple” }
  14. DiscountApplied { Requirement { 2, “banana” }, -$0.10 }
  15. AddItem { “mango”, $2.00 }
  16. CheckedOut { Items { Item { “banana”, 2, $1.00 }, Item { “mango”, 1, $2.00 } }, Discount {“banana”, 2, -$0.10} }
    .50 }
  17. AddItem { “banana”,
  18. AddItem { “banana”, $1.00 }
  19. AddItem { “apple”, $1.50 }
  20. AddItem { “banana”, $1.00 }
  21. RemoveItem { “apple” }
  22. DiscountApplied { Requirement { 2, “banana” }, -$0.10 }
  23. AddItem { “mango”, $2.00 }
  24. CheckedOut { Items { Item { “banana”, 2, $1.00 }, Item { “mango”, 1, $2.00 } }, Discount {“banana”, 2, -$0.10} }
    .00 }
  25. RemoveItem { “apple” }
  26. DiscountApplied { Requirement { 2, “banana” }, -[list=1]
  27. AddItem { “banana”, $1.00 }
  28. AddItem { “apple”, $1.50 }
  29. AddItem { “banana”, $1.00 }
  30. RemoveItem { “apple” }
  31. DiscountApplied { Requirement { 2, “banana” }, -$0.10 }
  32. AddItem { “mango”, $2.00 }
  33. CheckedOut { Items { Item { “banana”, 2, $1.00 }, Item { “mango”, 1, $2.00 } }, Discount {“banana”, 2, -$0.10} } .10 }
  34. AddItem { “mango”, .00 }
  35. CheckedOut { Items { Item { “banana”, 2,
  36. AddItem { “banana”, $1.00 }
  37. AddItem { “apple”, $1.50 }
  38. AddItem { “banana”, $1.00 }
  39. RemoveItem { “apple” }
  40. DiscountApplied { Requirement { 2, “banana” }, -$0.10 }
  41. AddItem { “mango”, $2.00 }
  42. CheckedOut { Items { Item { “banana”, 2, $1.00 }, Item { “mango”, 1, $2.00 } }, Discount {“banana”, 2, -$0.10} }
    .00 }, Item { “mango”, 1, .00 } }, Discount {“banana”, 2, -[list=1]
  43. AddItem { “banana”, $1.00 }
  44. AddItem { “apple”, $1.50 }
  45. AddItem { “banana”, $1.00 }
  46. RemoveItem { “apple” }
  47. DiscountApplied { Requirement { 2, “banana” }, -$0.10 }
  48. AddItem { “mango”, $2.00 }
  49. CheckedOut { Items { Item { “banana”, 2, $1.00 }, Item { “mango”, 1, $2.00 } }, Discount {“banana”, 2, -$0.10} } .10} }

最后一个事件(连同它是针对购物车101的事实,这是与事件相关的元数据)可以被用来推导出一个用于发布的事件。需要注意的一个关键问题是,除了这些事件,没有其他东西被写入数据库。

使用基于事件源的事件驱动系统的好处
由于事件源要求保留事件的持久性日志,这就形成了推导应用状态的单一事实来源,这反过来又提供了一些可以建立在事件日志之上的设施。

  • 完全重建Complete Rebuild: 应用状态可以被完全丢弃,并通过在空的应用状态上重新运行事件日志来重建。
  • 时间性查询Temporal Query: 通过从空白的应用状态开始,然后重新运行到某个特定时间或事件的事件,可以在任何时间点确定应用状态。
  • 事件回放:如果任何过去的事件是不正确的,可以通过反转它和后来的事件来计算后果,最后重新播放新的正确事件和后来的事件。这也可以通过空白的应用程序状态来实现,并在修复它们和它们的顺序后重新播放事件。
  • 系统调试/审计:在事件来源的日志中,事件的附加存储提供了一个审计跟踪,可用于监测针对数据存储采取的行动,通过随时重放事件,重新生成当前状态的物化视图或预测,并协助测试和调试系统。

为基于事件源的事件驱动系统使用事件存储
事件源模式通过存储事件序列(每个事件都代表数据或状态的变化)来捕获应用程序的状态转换或数据突变,每个事件都记录在一个不可变的只读存储中。该不可变的纯应用商店作为应用程序状态和数据操作的单一真理来源,以及,事件商店通常发布这些事件,以便消费者可以得到通知,并在需要时可以处理它们。例如,消费者可以启动任务,将事件中的操作应用于其他系统,或执行完成操作所需的任何其他相关动作,从而将应用程序与订阅的系统解耦。

一个事件来源的持久化将把实体建模为事件流,并保持这些事件流的不可改变的日志。当实体状态或属性发生变化时,会产生并保存一个新的事件。当实体状态需要恢复时,该实体的所有事件被读取,每个事件被应用于改变状态,达到实体的正确最终状态。请注意,这里的状态,是事件流在实体上的纯函数应用。

下面是一个在事件源的事件存储持久化之上的EntityStoreAdaptor的例子,看起来是这样的。

public class EntityStoreAdapter {
    EventDatabase db;
    Serializer serializer;
    //Command Applier Eager
    public void Save < T > (T entity) where T: Entity {
        var changes = entity.changes;
        if (changes.IsEmpty()) return;
// nothing to do
        var dbEvents = new List < DbEvent > ();
        foreach(var event in changes) {
            var serializedEvent = serializer.Serialize(event);
            dbEvents.Add(
                data: new DbEvent(serializedEvent),
                type: entity.GetTypeName();
            );
        }
        var streamName = EntityStreamName.For(entity);
        db.AppendEvents(streamName, dbEvents);
    }
   
//Query Resolver Lazy
    public T Load < T > (string id) where T: Entity {
        var streamName = EntityStreamName.For(entity);
        var dbEvents = db.ReadEvents(streamName);
        if (dbEvents.IsEmpty()) return default (T);
// no events
        var entity = new T();
        foreach(var event in dbEvents) {
            entity.When(event);
        }
        return entity;
    }


基于事件源的事件驱动系统的应用状态具体化
在面向终端用户的应用程序中,应用程序的当前状态需要按需推导,这是从实体状态之上的行动的事件物化中推导出来的。这也可以通过一个预定的工作来完成,这样实体的状态就可以被存储起来,以便查询和展示。

例如,一个系统可以维护一个所有客户订单的物化视图,用来填充用户界面的部分内容,例如聚合视图。当应用程序添加新的订单,添加或删除订单上的项目,以及添加运输信息时,描述这些变化的事件可以被处理并用于更新物化视图。

事件源通常与CQRS模式相结合,这是一种将读与写分离的架构风格。在CQRS架构中,数据从写数据库流向读数据库并被物化,在此基础上运行查询。由于读/写层是分开的,系统最终保持一致,但可扩展到大量的读和写。

基于事件源的事件驱动系统中需要注意的问题

  • 最终一致性 - 读取不会基于事件的最新写入,因为在创建新的物化视图或通过事件重放预测数据之间会有延迟。在发布事件、创建物化视图和通知消费者之间的延迟期间,新的事件将被写入事件日志。
  • 事件日志的不可更改性--事件日志作为一个单一的真相来源,需要是不可更改的,因此,事件数据不应该被更新。为了更新一个实体以撤销一个变化,就是要添加一个相应的事件来修复它。甚至事件模式的改变也需要在事件日志存储中的所有事件上完成。
  • 事件排序和线性 - 由于多个应用程序或发布者将创建和发布事件到事件存储,事件排序和线性对于保持数据的一致性变得非常重要。为每个事件添加一个时间戳,或者用一个增量标识符注释每个请求产生的事件,可以解决排序冲突的问题。
  • 消费者幂等性--事件发布可能至少有一次(因为保持精确的一次会很困难),所以事件的消费者必须是幂等的。如果事件被处理超过一次,他们必须不重新应用事件中描述的更新,以避免对实体状态或数据属性计算产生不必要的副作用。
  • 快照和具体化--事件需要定期被快照和具体化,特别是当事件流的大小很大,以处理对模型状态及其属性数据的按需查询时。

事件来源和变化数据捕获之间的区别
变化数据事件使用底层数据库交易日志作为真理的来源。该事件是基于事务日志所属的数据库,而不是原始的应用程序,只要事件是基于可变的数据库持久化的(不是不可变的),该事件就可用,这意味着与数据库数据模型的耦合更紧密。CDC以创建、更新或删除的方式捕获事件的效果,例如,按钮被更新为关闭状态。

你现在已经理解了事件驱动系统的架构,并深入到事件源来实现这样的系统。然而,在继续实施这种复杂的事件驱动架构的系统之前,需要考虑到事件源系统的使用情况。当数据中的意图、目的或原因需要被记录为发生的事件,并能够重放它们以恢复系统的状态、回滚变化或保持历史和审计日志时,就应该使用事件源,即事件作为系统的自然第一类功能特征出现,系统可以接受数据实体模型的最终一致性作为其非功能的副作用。