使用双时态 EventSourcing 修复过去并应对未来


使用 事件溯源EventSourcing 的感觉非常棒。系统中有发生的事件的历史记录。这使得调试变得更加容易。

然而,事件溯源并不能保护我们免受用户犯错误的影响。

我们应该如何处理用户输入错误的情况呢?

  • 我们应该删除该事件吗?
  • 在数据库中找到并更新它?但是根据定义,事件是不可变的。所以给定的解决方案听起来不像合法的计划......

双时态(bi-temporal) EventSourcing 怎么样?
双时态 EventSourcing 基于两个时间维度。

  1. 是事件发布的时间。我们使用timestamp元数据来记录该事实。
  2. 事件实际生效的时间。我们使用valid_at元数据来记录实际生效的时间。

考虑一个跟踪指定公司员工的工资系统:
您可能想到了 Excel 工作表,但让我们想象一些更复杂且用户友好的东西:)
您所要做的就是修改员工的工资。然后,更改会传播到工资系统,并在一天结束时存入员工的银行帐户。

目前,还没有办法处理由负责薪资管理的人员引入的错误。
让我们使用双时态 EventSourcing 方法来处理它。

在最简单的情况下,我们只是提高员工的工资,这工作得很好:

def test_raise_salary
  employee_id = SecureRandom.uuid
  stream = "Salary$#{employee_id}"
  expected_events = [SalaryRaised.new(data: { salary: 10_000, employee_id: employee_id })]

  aggregate_root_repository.with_aggregate(Salary.new(employee_id), stream) { |salary| salary.raise(10_000) }

  assert_expected_events_in_stream(stream, expected_events)
end


人类会犯错误。看看下面这个系统中可能发生的例子。
员工工资涨至10K
salary.raise(10000)

但是两个月后,一名员工查了工资单,结果发现没有得到涨后的新工资。理应得到 10.3K。

下面的测试展示了我们如何使用 双时态 EventSourcing 来处理该问题。

首先,我们来设定工资:

Timecop.travel(Time.utc(2022, 1, 1)) do
  aggregate_root_repository.with_aggregate(Salary.new(employee_id), stream) { |salary| salary.raise(10_000) }
end

我们假装回到过去,因此在Timecop中 我们定了2022年1月1日的工资。

到了2022 年 3 月 1 日,我们已经知道我们犯了一个错误,需要修复它。因此,我们使用valid_at元数据来表示此事件从该特定日期开始有效。

Timecop.travel(Time.utc(2022, 3, 1)) do
  event_store.with_metadata({ valid_at: Time.utc(2022, 1, 1) }) do
    aggregate_root_repository.with_aggregate(Salary.new(employee_id), stream) { |salary| salary.raise(10_300) }
  end
end

问题解决了,如何检查员工工资最后是否正确?

下面是我们如何从 RES 读取数据:

def salary_for_given_date(employee_id, valid_at)
  event_store
    .read
    .of_type(SalaryRaised)
    .as_of # ordered by valid_at
    .to_a
    .filter { |e| e.data.fetch(:employee_id).eql?(employee_id) }
    .filter { |e| e.metadata.fetch(:valid_at).to_date.eql?(valid_at.to_date) }
    .first
    .data
    .fetch(:salary)
end

整个测试如下:

def test_raise_salary_but_made_mistake_that_was_found_out_later
  employee_id = SecureRandom.uuid
  stream = "Salary$#{employee_id}"

  Timecop.travel(Time.utc(2022, 1, 1)) do
    aggregate_root_repository.with_aggregate(Salary.new(employee_id), stream) { |salary| salary.raise(10_000) }
  end

  Timecop.travel(Time.utc(2022, 3, 1)) do
    event_store.with_metadata({ valid_at: Time.utc(2022, 1, 1) }) do
      aggregate_root_repository.with_aggregate(Salary.new(employee_id), stream) { |salary| salary.raise(10_300) }
    end
  end
 
//这里测试最终应该是10.3k工资
  assert_equal 10_300, salary_for_given_date(employee_id, Time.utc(2022, 1, 1))
end

更智能的函数
我们需要一个更智能的函数来找出 2022 年 1 月 1 日的工资。

假设引入数据的人犯了错字。首先,他们引入了 10350,然后是 10300。

Timecop.travel(Time.utc(2022, 3, 1)) do
  event_store.with_metadata({ valid_at: Time.utc(2022, 1, 1) }) do
    aggregate_root_repository.with_aggregate(Salary.new(employee_id), stream) do |salary|
      salary.raise(10_350)
      salary.raise(10_300)
    end
  end
end

我们目前执行的 salary_for_given_date 会产生什么结果?

是 10350。

结果将按照 valid_at 排序,然后是单调的 event_id。
因此,我们可以在之前的函数中使用 last 而不是 first,但是......每当我为一个集合排序时,我更喜欢指定我所期望的顺序。

因此,我把代码改成了下面这段。它规定最新的修改总是优先:

def salary_for_given_date(employee_id, valid_at)
  event_store
    .read
    .of_type(SalaryRaised)
    .as_of # ordered by valid_at
    .to_a
    .filter { |e| e.data.fetch(:employee_id).eql?(employee_id) }
    .filter { |e| e.metadata.fetch(:valid_at).to_date.eql?(valid_at.to_date) }
    .sort { |a, b| b.metadata.fetch(:timestamp) <=> a.metadata.fetch(:timestamp) }
    .first
    .data
    .fetch(:salary)
end

更好例子
在我们的演示应用程序电子商务中,存在一个有关“计划以后价格”的问题。这是使用我们不久前推出的双时态 EventSourcing 功能的好机会。

我们没有更改域代码,而是使用了双时态 EventSourcing 功能。因此,我们需要一个新的命令及其处理程序。

class SetFuturePriceHandler
  def initialize(event_store)
    @repository = Infra::AggregateRootRepository.new(event_store)
    @event_store = event_store
  end

  def call(cmd)
    @event_store.with_metadata({ valid_at: cmd.valid_since }) do
      @repository.with_aggregate(Product, cmd.product_id) do |product|
        product.set_price(cmd.price)
      end
    end
  end
end

最重要的是以下几行

@event_store.with_metadata({ valid_at: cmd.valid_since }) do
  @repository.with_aggregate(Product, cmd.product_id) do |product|
    product.set_price(cmd.price)
  end
end

我们在事件存储中添加额外的 valid_at 元数据,它是另一个时间维度,决定事件何时生效。
到底是在哪个时间点上生效?
产品聚合发布 PriceSet 事件时会用到这个元数据。

测试代码如下:

 def test_check_future_price
       product_1_id = SecureRandom.uuid
       set_price(product_1_id, 20)
       future_date_timestamp = Time.now.utc + plus_five_days
       set_future_price(product_1_id, 30, future_date_timestamp.to_s)

       Timecop.travel(future_date_timestamp + 2137) do
         order_id = SecureRandom.uuid
         add_item(order_id, product_1_id)
         stream = "Pricing::Order$#{order_id}"

         assert_events(
           stream,
           OrderTotalValueCalculated.new(
             data: {
               order_id: order_id,
               discounted_amount: 30,
               total_amount: 30
             }
           )
         ) { calculate_total_value(order_id) }
       end
     end

正如您所看到的,当我们穿越到未来时,会使用更新的价格。

可以通过克隆电子商务并亲自尝试这些测试来亲自检查。

双时态 EventSourcing 是未来定价的好方法

  1. 元数据只是事件的一部分
  2. 事件是领域层的一部分。
  3. 因此,使用双时态 EventSourcing 处理此类问题是完全有意义的。

这个解决方案非常简单。它不需要我们对领域模型进行任何更改。