使用 事件溯源EventSourcing 的感觉非常棒。系统中有发生的事件的历史记录。这使得调试变得更加容易。
然而,事件溯源并不能保护我们免受用户犯错误的影响。
我们应该如何处理用户输入错误的情况呢?
- 我们应该删除该事件吗?
- 在数据库中找到并更新它?但是根据定义,事件是不可变的。所以给定的解决方案听起来不像合法的计划......
双时态(bi-temporal) EventSourcing 怎么样?
双时态 EventSourcing 基于两个时间维度。
- 是事件发布的时间。我们使用timestamp元数据来记录该事实。
- 事件实际生效的时间。我们使用valid_at元数据来记录实际生效的时间。
考虑一个跟踪指定公司员工的工资系统:
您可能想到了 Excel 工作表,但让我们想象一些更复杂且用户友好的东西:)
您所要做的就是修改员工的工资。然后,更改会传播到工资系统,并在一天结束时存入员工的银行帐户。
目前,还没有办法处理由负责薪资管理的人员引入的错误。
让我们使用双时态 EventSourcing 方法来处理它。
在最简单的情况下,我们只是提高员工的工资,这工作得很好:
def test_raise_salary employee_id = SecureRandom.uuid stream = "Salary$#{employee_id}" expected_events = [SalaryRaised.new(data: { salary: 10_000, employee_id: employee_id })]
aggregate_root_repository.with_aggregate(Salary.new(employee_id), stream) { |salary| salary.raise(10_000) }
assert_expected_events_in_stream(stream, expected_events) end
|
人类会犯错误。看看下面这个系统中可能发生的例子。
员工工资涨至10K
salary.raise(10000)
但是两个月后,一名员工查了工资单,结果发现没有得到涨后的新工资。理应得到 10.3K。
下面的测试展示了我们如何使用 双时态 EventSourcing 来处理该问题。
首先,我们来设定工资:
Timecop.travel(Time.utc(2022, 1, 1)) do aggregate_root_repository.with_aggregate(Salary.new(employee_id), stream) { |salary| salary.raise(10_000) } end
|
我们假装回到过去,因此在Timecop中 我们定了2022年1月1日的工资。
到了2022 年 3 月 1 日,我们已经知道我们犯了一个错误,需要修复它。因此,我们使用valid_at元数据来表示此事件从该特定日期开始有效。
Timecop.travel(Time.utc(2022, 3, 1)) do event_store.with_metadata({ valid_at: Time.utc(2022, 1, 1) }) do aggregate_root_repository.with_aggregate(Salary.new(employee_id), stream) { |salary| salary.raise(10_300) } end end
|
问题解决了,如何检查员工工资最后是否正确?
下面是我们如何从 RES 读取数据:
def salary_for_given_date(employee_id, valid_at) event_store .read .of_type(SalaryRaised) .as_of # ordered by valid_at .to_a .filter { |e| e.data.fetch(:employee_id).eql?(employee_id) } .filter { |e| e.metadata.fetch(:valid_at).to_date.eql?(valid_at.to_date) } .first .data .fetch(:salary) end
|
整个测试如下:
def test_raise_salary_but_made_mistake_that_was_found_out_later employee_id = SecureRandom.uuid stream = "Salary$#{employee_id}"
Timecop.travel(Time.utc(2022, 1, 1)) do aggregate_root_repository.with_aggregate(Salary.new(employee_id), stream) { |salary| salary.raise(10_000) } end
Timecop.travel(Time.utc(2022, 3, 1)) do event_store.with_metadata({ valid_at: Time.utc(2022, 1, 1) }) do aggregate_root_repository.with_aggregate(Salary.new(employee_id), stream) { |salary| salary.raise(10_300) } end end //这里测试最终应该是10.3k工资 assert_equal 10_300, salary_for_given_date(employee_id, Time.utc(2022, 1, 1)) end
|
更智能的函数
我们需要一个更智能的函数来找出 2022 年 1 月 1 日的工资。
假设引入数据的人犯了错字。首先,他们引入了 10350,然后是 10300。
Timecop.travel(Time.utc(2022, 3, 1)) do event_store.with_metadata({ valid_at: Time.utc(2022, 1, 1) }) do aggregate_root_repository.with_aggregate(Salary.new(employee_id), stream) do |salary| salary.raise(10_350) salary.raise(10_300) end end end
|
我们目前执行的 salary_for_given_date 会产生什么结果?
是 10350。
结果将按照 valid_at 排序,然后是单调的 event_id。
因此,我们可以在之前的函数中使用 last 而不是 first,但是......每当我为一个集合排序时,我更喜欢指定我所期望的顺序。
因此,我把代码改成了下面这段。它规定最新的修改总是优先:
def salary_for_given_date(employee_id, valid_at) event_store .read .of_type(SalaryRaised) .as_of # ordered by valid_at .to_a .filter { |e| e.data.fetch(:employee_id).eql?(employee_id) } .filter { |e| e.metadata.fetch(:valid_at).to_date.eql?(valid_at.to_date) } .sort { |a, b| b.metadata.fetch(:timestamp) <=> a.metadata.fetch(:timestamp) } .first .data .fetch(:salary) end
|
更好例子
在我们的演示应用程序电子商务中,存在一个有关“计划以后价格”的问题。这是使用我们不久前推出的双时态 EventSourcing 功能的好机会。
我们没有更改域代码,而是使用了双时态 EventSourcing 功能。因此,我们需要一个新的命令及其处理程序。
class SetFuturePriceHandler def initialize(event_store) @repository = Infra::AggregateRootRepository.new(event_store) @event_store = event_store end
def call(cmd) @event_store.with_metadata({ valid_at: cmd.valid_since }) do @repository.with_aggregate(Product, cmd.product_id) do |product| product.set_price(cmd.price) end end end end
|
最重要的是以下几行
@event_store.with_metadata({ valid_at: cmd.valid_since }) do @repository.with_aggregate(Product, cmd.product_id) do |product| product.set_price(cmd.price) end end
|
我们在事件存储中添加额外的 valid_at 元数据,它是另一个时间维度,决定事件何时生效。
到底是在哪个时间点上生效?
产品聚合发布 PriceSet 事件时会用到这个元数据。测试代码如下:
def test_check_future_price product_1_id = SecureRandom.uuid set_price(product_1_id, 20) future_date_timestamp = Time.now.utc + plus_five_days set_future_price(product_1_id, 30, future_date_timestamp.to_s)
Timecop.travel(future_date_timestamp + 2137) do order_id = SecureRandom.uuid add_item(order_id, product_1_id) stream = "Pricing::Order$#{order_id}"
assert_events( stream, OrderTotalValueCalculated.new( data: { order_id: order_id, discounted_amount: 30, total_amount: 30 } ) ) { calculate_total_value(order_id) } end end
|
正如您所看到的,当我们穿越到未来时,会使用更新的价格。
可以通过克隆电子商务并亲自尝试这些测试来亲自检查。
双时态 EventSourcing 是未来定价的好方法
- 元数据只是事件的一部分
- 事件是领域层的一部分。
- 因此,使用双时态 EventSourcing 处理此类问题是完全有意义的。
这个解决方案非常简单。它不需要我们对领域模型进行任何更改。