Saga模式实现事件驱动系统集成


在当今快节奏、互联的世界中,企业和开发人员不断寻求创新方法来构建高效且可扩展的应用程序。事件驱动架构 (EDA) 是最引人注目且势头强劲的架构范例之一。事件驱动的应用程序旨在响应实时事件,并已成为构建能够适应动态环境并蓬勃发展的系统的首选。事件驱动应用程序成功的核心是集成,它在确保各种服务和组件之间的无缝通信和协作方面发挥着关键作用。

在这篇博文中,我们将探讨 Saga 模式如何补充事件驱动应用程序上下文中的集成。

事件驱动架构是一种允许系统通过事件交换进行通信和交互的方法。事件可以是任何值得注意的事件或操作结果,例如用户单击按钮、传感器读数、到达队列的消息或正在更新的数据库记录。与传统的请求-响应模型不同,EDA 支持解耦组件,使应用程序更加灵活、可扩展和有弹性。

Saga 模式:协调本地事务
在事件驱动的系统中,一项操作通常跨越多个服务或组件,确保数据一致性可能具有挑战性。Saga 模式引入了一种跨多个服务编排一系列本地数据库事务的方法,确保所有事务都成功,或者如果其中任何事务失败,系统将恢复到一致状态。

在 Fmodel 中,Saga 模式被实现为一种Saga数据类型。

data class Saga<in AR, out A>(
    val react: (AR) -> Flow<A>

该组件由两个类型参数组成:

  • AR / Action Result - 代表触发Saga的事件类型。请注意我们在此上下文中是如何使用 "Action Result操作结果 "和 "Event事件 "作为同义词的。在其他一些上下文中,通知或消息也可用作 "Action Result操作结果 "的同义词。
  • A / Action(命令/行动)- 代表命令类型,该命令是触发传奇的Event / Action Result 的命令。

一个服务的输出/事件/动作结果被转化为另一个服务的输入/命令/动作。
这就是 Saga 模式的精髓。它能做出反应!

通过实现事件溯源 模式并将事件存储为本地事务的结果,我们可以轻松实现 Saga 模式。它完全符合强大的事件驱动架构。

我们倾向于将业务错误建模为事件而不是抛出异常,这样,就可以在出现业务错误时轻松执行补偿操作/命令。

saga 组件是纯粹的,在这个层面上,我们不处理任何副作用。
它是一个简单的函数,接收一个操作结果并返回一个操作流。

下面是一个 Saga 组件的示例,该组件对餐厅事件(OrderPlacedAtRestaurantEvent、RestaurantCreatedEvent、RestaurantMenuChangedEvent、RestaurantErrorEvent)作出详尽反应,并将其转换为订单命令(CreateOrderCommand):

typealias OrderSaga = Saga<RestaurantEvent?, OrderCommand>

fun orderSaga() = OrderSaga(
    react = { e ->
        when (e) {
            is OrderPlacedAtRestaurantEvent -> flowOf(
                CreateOrderCommand(
                    e.orderId,
                    e.identifier,
                    e.lineItems
                )
            )

            is RestaurantCreatedEvent -> emptyFlow() // We are not interested in this event, so we return the empty flow of commands.
            is RestaurantMenuChangedEvent -> emptyFlow()
// We are not interested in this event, so we return the empty flow of commands.
            is RestaurantErrorEvent -> emptyFlow()
// We are not interested in this event, so we return the empty flow of commands.
            null -> emptyFlow()
// We ignore the `null` event by returning the empty flow of commands. Only the Saga that can handle `null` event/action-result can be combined (Monoid) with other Sagas.
        }
    }
)

将我们的系统与第三方支付提供商集成的 Saga 组件示例。如下图所示,该组件对所有事件(OrderPreparedEvent、OrderCreatedEvent......)作出详尽反应,并将其转化为命令:

typealias PaymentSaga = Saga<Event?

fun paymentSaga() = PaymentSaga(
   react = { e ->
      当 (e) {
         is OrderPreparedEvent -> flowOf(PayCommand(orderId = e.identifier))// 我们只对 "OrderPreparedEvent "感兴趣,并将其转换为 "PayCommand"。
         is OrderCreatedEvent -> emptyFlow()
// 我们对该事件不感兴趣,因此返回空命令流。
         is OrderErrorEvent -> emptyFlow()
// 我们对该事件不感兴趣,因此返回空命令流。
         is OrderPayedEvent -> emptyFlow()
// 我们对此事件不感兴趣,因此返回空命令流。
         is RestaurantEvent -> emptyFlow()
// 我们对此事件不感兴趣,因此返回空命令流。
         null -> emptyFlow()
// 我们忽略 "null "事件,返回空命令流。只有可以处理 "null "事件/动作结果的传奇可以与其他传奇组合(Monoid)。
      }
   }
)

Saga 管理器通过使用 Saga 组件作为既定算法来处理所有副作用(从事件存储区读取事件、向命令总线发布命令或直接调用服务等)。Saga 管理器的具体实现取决于您。我们将其视为一个事件存储(或 Kafka)事件订阅器/处理程序:

  • 对事件做出异步反应并发布命令、
  • 降低服务之间的运行时耦合度,并
  • 提高系统的弹性(重试、错误处理等)

事件建模与集成
事件建模通过对两种模式进行分类来促进集成:

  • 翻译--将外国事件翻译成我们自己系统更熟悉的命令
  • 自动化--将我们的事件翻译成国外系统的命令,但它建立在翻译之上,并添加了一个待办事项列表/视图,可以更稳健地控制向国外系统发布命令的过程。

Saga 模式非常适合这两种模式。顾名思义,它是一种翻译模式(将事件翻译成命令)。它可以将外来事件翻译成我们自己系统中更熟悉的命令,或者将我们的事件翻译成外来系统的命令,从而充当一个防腐层。

演示使用PostgreSQL 支持的事件存储,针对事件源和事件流进行了优化。
Fstore-SQL仅使用 SQL (PostgreSQL) 来启用事件溯源和基于池的事件流模式。

结论
集成可实现不同服务之间的无缝通信和协作,从而形成灵活且可扩展的架构。同时,Saga模式保证了数据的一致性,并有助于管理事件驱动系统中的业务事务。

随着企业继续采用事件驱动架构来满足实时数据处理和事件处理的需求,掌握集成艺术并了解如何有效应用 Saga 模式对于构建健壮、适应性强且有弹性的应用程序至关重要。通过结合这两种方法,开发人员可以构建在性能、可靠性和可扩展性方面表现出色的事件驱动应用程序,使企业能够在日益互联的世界中蓬勃发展。

持久存储事件才能真正实现事件流和事件驱动的架构。这是业务流程稳健自动化的先决条件。考虑Event Sourcing作为事件驱动架构的基础。