使用事务发件箱进行可靠的事件调度


在系统中使用事件很棒,但是您如何确定您是否可靠地调度了事件?事件的传输需要可靠地完成,同时保持整个系统的一致性,无论是最终的还是立即的。在典型的设置中,数据库用于存储信息,队列用于在进程和系统之间发送消息。通常,事件在将信息存储在数据中的同一操作中直接分派到队列。这看起来似乎并没有明显的错误,但这种做法存在一个潜在的问题。

非事务调度
当事件直接分派到队列时,单个请求或操作中会发生两个网络请求,这会产生问题。两个网络请求不能成为原子请求(同时失败或成功)。
为了说明这一点,以下是可能发生的情况的简要概述:

先存储数据,再发送消息:

  • 如果存储数据失败,则操作失败,不幸的是但不会导致不一致。
  • 如果存储数据没问题,但发布事件失败,则说明外部世界没有保持最新。

发送消息,然后存储数据(永远不要这样做):
  • 如果发送消息失败,则操作失败,不幸但不会导致不一致。但是问问你自己,你会因为这个而回滚吗?
  • 如果调度事件没问题,但存储数据失败,则外部世界领先于内部一致性(这太疯狂了)。

计算机科学法确实限制了我们的选择,我们需要重新考虑我们的方法。尽管可以通过重试来降低发生这种情况的可能性,但您永远无法完全避免它。在一定程度上,这些不一致会伤害你。让我们看看交易发件箱如何提供帮助。

事务调度
我们的目标是使事件的分派失败或成功与存储应用程序状态一起。为此,我们需要确保在用于在数据库中存储状态的同一操作中分派事件。这意味着我们需要将事件存储在数据库中的缓冲表中。通过这样做,我们有效地将数据库用作队列。现在我已经可以听到你说“但是使用数据库作为队列是不好的”,一般来说我会同意你的看法。使用数据库作为队列通常不是一个好主意,那么为什么在这种情况下可以呢?

与许多软件事物一样,一切都与权衡有关。虽然将数据库用于队列是一个糟糕的默认设置,但它提供的一致性对于您的用例来说可能更为重要。如果我们想确保我们没有丢失任何消息,一致性是最重要的方面,证明权衡是合理的。

限制数据库的使用仍然很重要,我们可以通过仅使用它来发布消息来做到这一点。常规队列仍然是负责将消息实际传递给消费者的基础设施。队列对消费者来说仍然好得多,它们提供了更精细的功能,例如路由、重试、死信,等等。

从鸟瞰图来看,整个过程是这样的:

这种设置虽然更复杂,但非常可靠。我们可以保证当且仅当成功存储应用程序的状态时才发送消息或事件。

让我们更深入地挖掘一下,看看需要什么才能使它对我们有效。

所需设置
良好的交易发件箱设置需要几个元素,让我们逐一分析:

  1. 支持事务的数据库 大多数典型的关系数据库都可以正常工作;PostgreSQL、MySQL、MariaDB,选择你的毒药。
  2. 调度事件的缓冲表 在您的数据库中,创建一个可以存储事件的表。该表和包含您的应用程序状态的表需要在同一事务中可用。
  3. 中继机制 当事件被发布到缓冲表时,它们需要被消费并转发到队列。执行此操作的组件通常称为继电器。您可以自己编写守护程序脚本,也可以使用现成的解决方案,例如Debezium
  4. 一个实际的队列 消息由中继从缓冲表中消费并最终发布到队列中,供所有感兴趣的各方消费。

把它们绑在一起
现在我们知道不同的部分是什么,我们如何让这一切工作?好吧,我很高兴你问了。在伪代码中,工作流的简化视图如下所示:

database.beginTransaction();
try {
    domainModelRepository.persist(domainModel);
    databaseOutbox.dispatch(events);
} catch (e) {
    database.rollback():
    throw e;
}
database.commitTransaction();

在执行常规数据库操作时,我们使用事务来使多个查询同时成功或失败。每个事务包装多个语句,如果其中任何一个失败,所有操作都会回滚。对于发件箱,这意味着事件的插入发生在与更新应用程序状态的查询相同的数据库事务中。通过使用事务,存储状态和调度事件要么都成功,要么都失败。

现在我们有了事务调度!

获取消息给消费者
当然,消息需要最终到达消费者。名为中继的后台进程拾取消息并将它们中继(转发)到队列。消费者消费队列中的消息,就好像什么都没发生过一样,但我们知道得更多!

重要的实施方面
在实施发件箱时,请务必牢记几件事。

  • 发件箱表应该具有独占读取发件 箱表可以被多个进程读取,这可能会导致消费者的消息重复,因为相同的消息会被中继多次。虽然理想情况下消费者应该是幂等的,但这种重复会导致信噪比上升。为了补救这个发件箱中继应该使用并发锁定机制来防止竞争读取。多个消费者在每个可用区运行以确保可靠性,同时锁定机制可防止双重中继。
  • 发件箱表按顺序 使用 中继按输入顺序使用发件箱表。如果中继无法中继消息,中继将被阻止。因此,确保中继保持哑性很重要,这意味着它只负责从 A 点到 B 点获取消息。在中继过程中直接执行应用程序代码是绝对不行的。在消费者失败的情况下,队列将被阻塞,直到部署补救措施。与像 RabbitMQ 这样的队列相比,这是根本不同的。RabbitMQ 允许乱序消费,允许在稍后的时间点处理失败的消息。
  • 发件箱的吞吐量有限 与 RabbitMQ 等队列或 Kafka 等流式消息传递平台相比,发件箱表的吞吐量有所降低。您可以通过向发件箱表添加分片机制来提高吞吐量。通过这种方式,您可以增加中继数量以匹配分片数量,这将增加吞吐量。无论采用哪种方法,建议为每个业务流程使用特定的发件箱,以确保每个业务流程的吞吐量不受另一个业务流程的影响。
  • 如果不进行清理,发件箱将无限增长 发件箱表是缓冲表。缓冲区可以增长,但最终应该会缩小。确保考虑缓冲区清理,从中删除不再需要的消息。这可以使用 cronjob 来完成,或者您的中继可以在消息被使用时删除它们。
  • 发件箱表可用于灾难恢复 如果将发件箱配置为存储消息更长时间,则可以通过将发件箱重新用于灾难恢复案例来利用引入发件箱的投资。事件的重新调度可以使消费者恢复最新状态,而无需复杂的跨系统同步机制。

结论
事务性发件箱模式是一种被严重低估且可能被低估的解决现实世界问题的模式。如果您正在按比例处理系统、使用事件并关心一致性,那么您可能应该使用发件箱!