无服务器与事件溯源结合的演示案例:将事件溯源作为Azure函数的数据持久化机制的库

19-11-14 banq

简单的说,事件溯源是一种存储状态(对于实体)的方法,该状态是通过存储该实体发生的所有事件的顺序历史记录而起作用的。对实体的更改将作为新事件写入,附加到该实体的事件流的末尾。

当查询或业务流程需要使用实体的当前状态时,它会通过在事件流上运行投影来获取此信息,这是一段非常简单的代码,对于每个事件,它决定(a)我是否关心这种类型事件(b)如果是,我收到事件后该怎么办。

目标是能够与实体的事件流进行交互,而无需在Azure函数本身中进行任何额外的设置-既可以访问事件流,又可以通过在执行Azure函数时实例化的绑定变量来运行投影。

要将事件添加到事件流,可以使用事件流属性和类,因此:

[FunctionName("OpenAccount")]
public static async Task<HttpResponseMessage> OpenAccountRun(
              [HttpTrigger(AuthorizationLevel.Function, "POST", Route = "OpenAccount/{accountnumber}")]HttpRequestMessage req,
              string accountnumber,
              [EventStream("Bank", "Account", "{accountnumber}")]  EventStream bankAccountEvents)
{
    if (await bankAccountEvents.Exists())
    {
        return req.CreateResponse(System.Net.HttpStatusCode.Forbidden , $"Account {accountnumber} already exists");
    }
    else
    {
        // Get request body
        AccountOpeningData data = await req.Content.ReadAsAsync<AccountOpeningData>();

        // Append a "created" event
        DateTime dateCreated = DateTime.UtcNow;
        Account.Events.Opened evtOpened = new Account.Events.Opened() { LoggedOpeningDate = dateCreated };
        if (! string.IsNullOrWhiteSpace( data.Commentary))
        {
            evtOpened.Commentary = data.Commentary;
        }
        await bankAccountEvents.AppendEvent(evtOpened);
                
        return req.CreateResponse(System.Net.HttpStatusCode.Created , $"Account {accountnumber} created");
    }
}

从事件流中获得状态的方式称为投射 projection:

 [FunctionName("GetBalance")]
   public static async Task<HttpResponseMessage> GetBalanceRun(
     [HttpTrigger(AuthorizationLevel.Function, "GET", Route = "GetBalance/{accountnumber}")]HttpRequestMessage req,
     string accountnumber,
     [Projection("Bank", "Account", "{accountnumber}", nameof(Balance))] Projection prjBankAccountBalance)
   {
       string result = $"No balance found for account {accountnumber}";

       if (null != prjBankAccountBalance)
       {
           Balance projectedBalance = await prjBankAccountBalance.Process<Balance>(); 
           if (null != projectedBalance )
           {
               result = $"Balance for account {accountnumber} is ${projectedBalance.CurrentBalance} (As at  {projectedBalance.CurrentSequenceNumber}) ";
           }
       }
       return req.CreateResponse(System.Net.HttpStatusCode.OK, result); 
   }

这两个属性的所有属性都设置为AutoResolve,因此可以在运行时设置它们。

由于事件流本质上是仅附加系统,因此其基础的存储技术是AppendBlob-一种特殊的Blob存储类型,它仅允许将块附加到Blob的末尾。每个Blob最多可以存储50,000个事件,并且容器路径可以与任何其他Azure Blob存储相同的方式嵌套。

对于高容量流,可以使用Azure Tables后端代替AppendBlob。存储技术和存储目标的选择可以通过应用程序上的配置设置进行切换。

在该库中,必须根据需要检索实体的状态-这是为了使函数应用程序分解为零,并且实际上允许多个独立的Azure功能应用程序使用相同的基础事件流而不必使用任何“始终在线”一致性服务。

 

              

猜你喜欢