ProcessManager:DDD流程管理器案例


领域驱动设计 (DDD) 是一种构建软件的方法,它试图通过关注核心领域、构建领域的可进化模型以及让领域专家参与模型的进化来开发和确保解决业务的复杂性一致的理解和语言。您可以将域视为问题空间。例如,允许人们从当地的外卖店在线订购食物,并为餐馆提供在线服务和处理订单的能力,而无需对基础设施进行前期投资,这就是 JUST EAT 的问题空间。

长处理时间和循环思考
长时间运行的流程会在时间方面引入复杂性。
比如说,订单已下达,但付款上下文从未确认实际发生了货币交易。
如果没有付款确认,该流程将无法进行。

在某些情况下,无限期的等待可能很好,但大多数时候需要有一个关于产品是否可以交付的明确答案。
如果在两分钟内没有确认付款,我们需要一种方法来通知将来取消订单的过程。

为了处理这些定时问题,我们需要引入一个可以延迟发送消息的警报或超时服务。
下订单后,我们会告诉警报服务将向我们发送取消消息。因此,如果在接下来的几分钟内没有付款,我们将从警报中收到订单取消消息。
但是,如果已付款并且订单已通过该流程进行了推进,则在收到取消时将立即忽略它。

在设计流程时考虑时间循环非常强大,因为如果发生意外情况,流程可以进入需要人工干预的状态,以便人们可以接手工作并解决意外问题。

流程自动化的最终目标不是完全取代人工干预,而是尽可能多地自动化工作。

在下一节中,我将介绍进程管理器模式并给出一些 C# 代码示例。虽然有很多方法可以实现流程管理器模式,但我会保持简单,并尽量减少对其他框架或平台的依赖。

流程管理器模式
当在设计时可能不知道所需的步骤并且可能不是连续的时,我们如何通过多个处理步骤路由消息?企业集成模式 流程管理器的主要目的是封装流程特定的逻辑并维护一个中心控制点。它由触发消息启动,该消息可能是来自有界上下文的事件。 

流程管理器决定一旦流程完成成为开始和结束类型消息的中心后接下来要执行什么。当并行进程尝试回传其状态时,它也可能成为性能瓶颈。 这是流程管理器的最简单实现。

当有趣的事情发生时,方法将被调用:

public class OrderProcessManager
{
    public enum OrderProcessState
    {
        NotStarted,
        OrderPlaced,
        PaymentCompleted,
        OrderDispatched,
        OrderDelivered
    }
    public OrderProcessManager()
    {
        State = OrderProcessState.NotStarted;
    }
    public Guid Id { get; private set; }
    public OrderProcessState State { get; set; }
    public void When(OrderPlaced @event)
    {
    }
    public void When(PaymentCompleted @event)
    {
    }
    public void When(OrderDispatched @event)
    {
    }
    public void When(OrderDelivered @event)
    {
    }
}

触发消息是OrderPlaced事件:

public class OrderPlaced
{
    public Guid OrderId { get; set; }
    public Guid RestaurantId { get; set; }
    public Guid UserId { get; set; }
    public List<OrderItem> Items { get; set; }
    public decimal Price { get; set; }
    public Address DeliveryAddress { get; set; }
}
public class Address
{
    public string Street { get; set; }
    public string PostalCode { get; set; }
}
public class OrderItem
{
    public Guid Id { get; set; }
    public string Name { get; set; }
}


所有这些都是普通的旧 c# 代码,非常棒,但这里缺少一些基础设施。OrderPlaced 事件是如何传递的,谁创建了 OrderProcessManager 类的新实例?

我不打算在这里详细介绍基础设施,但您可以使用服务总线实现来为您传递消息。在我的示例中,我使用了 JustSaying,它是一个在 Amazon Web Services 之上运行的开源库,并使用Simple Queue ServiceSimple Notification Service为我传递消息。

状态管理
流程管理器的另一项职责是维护消息循环之间的状态。它通过使用某种持久性存储(如 SQL 数据库或 NoSQL/Document 数据库)来实现这一点。所有消息都到达消息处理程序,在这种情况下,该处理程序扮演流程管理器工厂和消息路由器的角色。

这是 OrderPlaced 处理程序的简单实现。我使用了路由器这个名称,因为它的主要目的是找到正确的流程管理器实例并将消息转发给它。

public class OrderProcessRouter:
        IHandler<OrderPlaced>
{
    private readonly IRepository<OrderProcessManager> _repository;
    public OrderProcessRouter(IRepository<OrderProcessManager> repository)
    {
        _repository = repository;
    }
    public bool Handle(OrderPlaced message)
    {
        var pm = _repository.Load(message.OrderId);
        if (pm == null)
        {
            pm = new OrderProcessManager();
        }
        pm.When(message);
        _repository.Save(pm);
        return true;
    }

这里正在发生一些事情:
首先,相关性:接收到消息时,需要识别并重新加载每个流程管理器实例。为了能够识别它,每条消息都需要与流程管理器实例共享一个相关 ID。在这种情况下,我使用 OrderId 作为相关 ID。

其次,幂等性:尽管 OrderPlaced 事件是流程管理器新实例的触发器,但路由器首先尝试通过 ID 查找流程管理器。如果该订单的流程已经开始,则意味着我们收到的消息是重复消息。在像 AWS SQS 这样的分布式消息系统中,您偶尔会收到一条重复的消息,您必须处理它。大多数分布式消息传递系统保证至少一次传递消息。

第三,消息转发/路由:路由器在这里简单地将消息交给进程管理器。最后,坚持。新创建的实例或流程管理器被持久化到存储库中。您可能在这里遇到竞争条件,另一台机器上的另一个路由器正在保存相同的实例,因此您需要在流程管理器中实现一些版本控制并在存储库中强制执行并发检查。这很关键。

事件处理
让我们看看当流程管理器收到 OrderPlaced 事件时会发生什么。

public class OrderProcessManager
{
  public Guid Id { get; set; }
  public OrderProcessState State { get; set; }
  public int Version { get; set; }
  public List<Command> CommandsToSend { get; private set; }
  public Guid RestaurantId { get; set; }
  public Guid UserId { get; set; }
  public List<OrderItem> Items { get; private set; }
  public Address DeliveryAddress { get; set; }
  public decimal Amount { get; set; }
  public void When(OrderPlaced @event)
  {
      switch (State)
      {
          case OrderProcessState.NotStarted:
              State = OrderProcessState.OrderPlaced;
              Id = @event.OrderId;
              Items = @event.Items;
              RestaurantId = @event.RestaurantId;
              UserId = @event.UserId;
              DeliveryAddress = @event.DeliveryAddress;
              Amount = @event.Amount;
              SendCommand(new ProcessPayment
              {
                  OrderId = @event.OrderId,
                  RestaurantId = @event.RestaurantId,
                  Amount = @event.Amount
              });
              break;
          // idempotence - same message sent twice
          case OrderProcessState.OrderPlaced:
              break;
          default:
              throw new InvalidOperationException(
"Invalid state for this message");
      }
  }
}

流程管理器存储所有信息并将其内部状态更改为 OrderPlaced。它还向付款“发送”命令以处理订单,传递有关订单的一些重要细节。

下面的 SendCommand 函数只是将命令添加到要发送的命令列表中,因此名称具有误导性,但我想不出更好的名称。保存流程管理器实例后,存储库将使用一些基础设施来发送命令。不过,我不会在这里展示。

private void SendCommand(Command command)
{
    CommandsToSend.Add(command);
}


此时“支付循环”开始。在支付有界的上下文中,会发生很多事情。为简单起见,我在这里没有涉及警报服务,但理想情况下,如果两分钟的付款没有发生任何事情,您希望流程管理器实例得到通知。一种方法是添加另一个要发送到警报的命令,其中包含必要的详细信息。一旦付款成功完成,PaymentCompleted 事件就会从 Payments 有界上下文中出来。流程管理器现在转到下一步,即将订单分派到餐厅进行交付。

public void When(PaymentCompleted @event)
{
    switch (State)
    {
        case OrderProcessState.OrderPlaced:
            State = OrderProcessState.PaymentCompleted;
            SendCommand(new DispatchOrder
            {
                OrderId = Id,
                RestaurantId = RestaurantId,
                Items = Items.ToList(),
                Amount = Amount,
                DeliveryAddress = DeliveryAddress
            });
            break;
        // idempotence - same message sent twice
        case OrderProcessState.PaymentCompleted:
            break;
        default:
            throw new InvalidOperationException(
"Invalid state for this message");
    }
}

这里的模式相同。状态改变了,一个新的处理循环开始了,订单调度循环。
您可以在此处https://github.com/justeat/ProcessManager