任何基础设施的关键部分之一是存储。与传统的关系模型相比,将事件存储在日志中非常简单。但是,当您体验到成功产品的好运时,即使是日志式存储也必须不断发展才能跟上。
幼稚的实现
当我开始使用事件溯源时,我想尽可能简单。老实说,我无法理解常见事件溯源数据库实现的许多陷阱。
CREATE TABLE Event |
这种配置对我们来说效果很好,性能很好(即使在 t2.micro 实例上),而且理解起来也相对简单。如果这是一个内部系统,我可能会停在那里。但是对于多租户应用程序,在某些时候这会遇到扩展障碍。
事件表
让我们从事件表本身开始。尽管这个更新后的表几乎相同,但我将继续解释每个字段和索引的用途。
CREATE TABLE IF NOT EXISTS Event |
SequenceNum
事件侦听器使用它来跟踪它们在流中的当前位置。这里唯一真正重要的是保留事件保存的顺序,自动增量大整数可以很好地做到这一点。
StreamId
流的标识符。我选择了 uuid,但身份有很多可行的选择。
Version
该字段在以前的版本中不存在,但作为优化添加。如果您将流视为一个数组,这就是事件的索引号。将其与事件一起保存有助于避免手动计数。
Data
这是事件的序列化数据。我选择了 jsonb 格式,但还有其他选择。如果我有更多资源来开发自己的内省工具,我可能会使用像 Protocol Buffers 这样的二进制序列化来提高速度。
Type
这是事件的类型。存储它支持过滤...以避免在侦听器不使用事件时获取/反序列化。我也用它来反序列化。
Meta
这支持审计和跟踪。我在这里放了一些东西,比如用户、执行权限、相关 ID 等。
LogDate
除了审计和跟踪之外,这还可以用于跨分片排序。尽管在业务逻辑中使用 LogDate 很诱人,但它只能用于基础设施目的。如果需要时间上下文,事件应该包含它们自己的时间戳。但我承认我使用 LogDate 进行报告。
索引/键
SequenceNum首要的关键
允许基于位置的高效查询。IEWHERE SequenceNum > @LastSeenSeqNum
StreamId, Version唯一键
允许按版本顺序有效加载特定流。Postgres 可以使用此索引进行处理ORDER BY Version,而无需额外的排序步骤。
StreamId外键
这主要是“训练轮”,以确保面对错误时的数据完整性。一旦代码经过实战验证,就可以将其删除以提高性能。
流Stream表
我们想要添加到事件存储的功能之一是支持并发编写器。并发编写者意味着:我可以部署我的业务服务的多个副本(那些生成事件的东西,也就是命令处理程序),而不需要它们之间的锁定或协调。我们甚至可以在 Lambda 等“无服务器”架构上运行这些,以按需自动扩展计算资源。这是通过使用乐观并发来实现的。你可以把这想象成 git 中的合并冲突——两个不同的分支独立地对同一行代码进行了更改。在这种情况下,两个独立的用户试图同时对同一个实体进行更改。这是我们用来检测的表。
CREATE TABLE IF NOT EXISTS Stream |
该表的核心只是跟踪每个流的当前版本。这是使用此表要遵循的具体过程。
- 在执行业务逻辑之前,获取流的当前版本。
- 执行业务逻辑,生成新事件。
- 仅在以下情况下保存事件(在事务中)
- Stream 表中的版本与您开始时的版本相同
- 然后还将 Stream 表更新为上次保存的版本
- 指示事件是否已保存。例如返回真/假
这确保编写者知道他们何时尝试保存冲突事件,并且他们可以返回错误、重试或业务规则定义的某些其他冲突解决过程。默认情况下,我们可能只会出错,直到有理由不这样做。
快照表
当流变得非常大(可能 > 1000 个事件?)时,加载和重放流以获取当前状态可能会变得太慢。对此的常见缓解措施是使用快照。我们不是每次都从头开始重建域模型,而是偶尔重建到域模型状态的最新版本并将其保存到数据库中。之后,为了加载流,我们首先获取快照,然后仅获取自快照版本以来的事件。这是支持这一点的表格。
CREATE TABLE IF NOT EXISTS Snapshot |
当流“太大”时,应用程序代码应该创建一个快照(这应该由请求时间等指标确定)。还应不时重新创建快照(在 1000 多个事件之后)以及对快照进行结构更改时 - 请参阅下面的修订。
快照如何保存?
最好的方法可能是拥有一个单独的按计划运行的快照服务。它可以使用 Stream 表来识别具有大量事件的流。它还可以检查现有的快照版本以找到需要更新的版本。构建事件存储中描述了一次完成这两项检查的查询。此外,可以检查快照修订以重新创建结构更改的快照。
现在确定需要快照的流。下一步是调用适当的域模型代码以将流重播到当前状态。然后该状态被保存到快照表中。Stream.Type 可用于确定要调用哪个流重放代码。如果您没有选择在 Stream 表中包含 Type 列,那么您还可以读取流的第一个事件。通常这些表示它是流的类型。
多租户
当多个租户属于同一个父组织时提供汇总报告。或立即测量系统上的活动。当通过表分离租户时,生成一个查询所有租户的事件是很痛苦的。
分区
一种在逻辑上分离租户数据但又具有单个 Event 表的便利的方法是使用 Postgres 的表分区。他们在最近的版本中为此功能添加了很多内容。而在 Postgres 11 中,这个特性尤其强大。这是按租户 ID 分区的事件表版本。它看起来几乎一样!
CREATE TABLE IF NOT EXISTS Event |
主表上的约束需要包含租户分区键,这里是TenantId。
租户分区
不幸的是,Postgres 还没有自动创建分区。因此,在为租户插入数据之前,您首先必须为其创建一个分区。添加新租户时通常会有一个配置过程,因此分区的创建可以简单地成为该过程的一部分。下面是使用 TenantId = 847ED1889E8B4D238EB49126EBD77A4D 创建租户的示例。
CREATE TABLE IF NOT EXISTS Event_847ED1889E8B4D238EB49126EBD77A4D |
在幕后,每个分区都有自己的索引。所以你最终不会得到一个巨大的索引。当您跨租户查询数据时,Postgres 可以在分区上并行运行查询,然后聚合它们。
插入事件
插入数据就像将数据插入单个表一样,Postgres 会自动将其路由到适当的分区。
INSERT |
按顺序查询
要支持按发生顺序跨租户读取事件,您可以运行这样的查询。
SELECT * |
此查询支持读取最多 1000 个批次。我避免使用OFFSET,因为一旦偏移值变大,它的效率就会很低。并且每个侦听器通常都会跟踪它处理的最后一个 SequenceNum。
我可以在 WHERE 子句中添加另一个条件,AND SequenceNum <= 1000而不是使用LIMIT. 但是由于并发性可能会跳过序列号(见下文)。虽然这是一个小问题。
按流查询
流式查询和以前一样,只是我们现在还需要提供 TenantId。
SELECT * |
事件通知
每当将事件添加到事件表的任何分区时,您都可以触发 Postgres 通知。这是我目前使用的。
DROP TRIGGER IF EXISTS trg_EventRecorded ON Event; |
当一个事件被插入时,它会触发一个通知到通道eventrecorded。
事件本身可能太大而无法容纳在通知负载中。因此,相反,我提供了足够的有关事件的数据,让听众知道他们是否想麻烦加载它。通常监听者只关心事件的类型和序列号。然后他们通常从他们处理的最后一个序列号开始加载一批事件。所以中间位可能是 YAGNI 违规。但是如果监听器想要加载特定事件而不是批处理,则包含在内感觉是正确的。
要监听事件通知,SQL 部分很简单:
LISTEN eventrecorded; |
租户数据现在也很容易单独删除或备份,因为它实际上位于自己的单独表中。例如假设 TenantId = 847ED1889E8B4D238EB49126EBD77A4D 要求我们删除他们的数据,那么很容易摆脱它。
-- DANGER DANGER |
当数据集变大时,这种按租户划分事件流的方法可以提高性能。在某些时候,我们可能需要更进一步,将我们的数据分区到单独的数据库节点上。这种基本表结构似乎也适合使用 Citus Data(最近被 Microsoft 收购)之类的基于节点的分区。但是,当一个总体索引成为性能瓶颈时,上述方法应该可以解决这个尴尬的第一个扩展障碍。它并不比一张表更费力!