Postgres事件溯源的存储表设计 - DEV


任何基础设施的关键部分之一是存储。与传统的关系模型相比,将事件存储在日志中非常简单。但是,当您体验到成功产品的好运时,即使是日志式存储也必须不断发展才能跟上。
幼稚的实现
当我开始使用事件溯源时,我想尽可能简单。老实说,我无法理解常见事件溯源数据库实现的许多陷阱。

CREATE TABLE Event
(
    SequenceNum bigserial NOT NULL,
    StreamId uuid NOT NULL,
    Data jsonb NOT NULL,
    Type text NOT NULL,
    Meta jsonb NOT NULL,
    LogDate timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (SequenceNum)
);

CREATE INDEX idx_event_streamid ON Event (StreamId);

这种配置对我们来说效果很好,性能很好(即使在 t2.micro 实例上),而且理解起来也相对简单。如果这是一个内部系统,我可能会停在那里。但是对于多租户应用程序,在某些时候这会遇到扩展障碍。

事件表
让我们从事件表本身开始。尽管这个更新后的表几乎相同,但我将继续解释每个字段和索引的用途。

CREATE TABLE IF NOT EXISTS Event
(
    SequenceNum bigserial NOT NULL,
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Data jsonb NOT NULL,
    Type text NOT NULL,
    Meta jsonb NOT NULL,
    LogDate timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (SequenceNum),
    UNIQUE (StreamId, Version),
    FOREIGN KEY (StreamId)
        REFERENCES Stream (StreamId)
);

SequenceNum
事件侦听器使用它来跟踪它们在流中的当前位置。这里唯一真正重要的是保留事件保存的顺序,自动增量大整数可以很好地做到这一点。

StreamId
流的标识符。我选择了 uuid,但身份有很多可行的选择。

Version
该字段在以前的版本中不存在,但作为优化添加。如果您将流视为一个数组,这就是事件的索引号。将其与事件一起保存有助于避免手动计数。

Data
这是事件的序列化数据。我选择了 jsonb 格式,但还有其他选择。如果我有更多资源来开发自己的内省工具,我可能会使用像 Protocol Buffers 这样的二进制序列化来提高速度。

Type
这是事件的类型。存储它支持过滤...以避免在侦听器不使用事件时获取/反序列化。我也用它来反序列化。

Meta
这支持审计和跟踪。我在这里放了一些东西,比如用户、执行权限、相关 ID 等。

LogDate
除了审计和跟踪之外,这还可以用于跨分片排序。尽管在业务逻辑中使用 LogDate 很诱人,但它只能用于基础设施目的。如果需要时间上下文,事件应该包含它们自己的时间戳。但我承认我使用 LogDate 进行报告。

索引/键
SequenceNum首要的关键
允许基于位置的高效查询。IEWHERE SequenceNum > @LastSeenSeqNum

StreamId, Version唯一键
允许按版本顺序有效加载特定流。Postgres 可以使用此索引进行处理ORDER BY Version,而无需额外的排序步骤。

StreamId外键
这主要是“训练轮”,以确保面对错误时的数据完整性。一旦代码经过实战验证,就可以将其删除以提高性能。

流Stream表
我们想要添加到事件存储的功能之一是支持并发编写器。并发编写者意味着:我可以部署我的业务服务的多个副本(那些生成事件的东西,也就是命令处理程序),而不需要它们之间的锁定或协调。我们甚至可以在 Lambda 等“无服务器”架构上运行这些,以按需自动扩展计算资源。这是通过使用乐观并发来实现的。你可以把这想象成 git 中的合并冲突——两个不同的分支独立地对同一行代码进行了更改。在这种情况下,两个独立的用户试图同时对同一个实体进行更改。这是我们用来检测的表。

CREATE TABLE IF NOT EXISTS Stream
(
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Type text NOT NULL,
    PRIMARY KEY (StreamId)
);

CREATE INDEX IF NOT EXISTS idx_stream_type ON Stream (Type);

该表的核心只是跟踪每个流的当前版本。这是使用此表要遵循的具体过程。

  • 在执行业务逻辑之前,获取流的当前版本。
  • 执行业务逻辑,生成新事件。
  • 仅在以下情况下保存事件(在事务中)
    • Stream 表中的版本与您开始时的版本相同
    • 然后还将 Stream 表更新为上次保存的版本
    • 指示事件是否已保存。例如返回真/假

这确保编写者知道他们何时尝试保存冲突事件,并且他们可以返回错误、重试或业务规则定义的某些其他冲突解决过程。默认情况下,我们可能只会出错,直到有理由不这样做。

快照表
当流变得非常大(可能 > 1000 个事件?)时,加载和重放流以获取当前状态可能会变得太慢。对此的常见缓解措施是使用快照。我们不是每次都从头开始重建域模型,而是偶尔重建到域模型状态的最新版本并将其保存到数据库中。之后,为了加载流,我们首先获取快照,然后仅获取自快照版本以来的事件。这是支持这一点的表格。

CREATE TABLE IF NOT EXISTS Snapshot
(
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Data jsonb NOT NULL,
    Revision int NOT NULL,
    PRIMARY KEY (StreamId)
);

当流“太大”时,应用程序代码应该创建一个快照(这应该由请求时间等指标确定)。还应不时重新创建快照(在 1000 多个事件之后)以及对快照进行结构更改时 - 请参阅下面的修订。

快照如何保存?
最好的方法可能是拥有一个单独的按计划运行的快照服务。它可以使用 Stream 表来识别具有大量事件的流。它还可以检查现有的快照版本以找到需要更新的版本。构建事件存储中描述了一次完成这两项检查的查询。此外,可以检查快照修订以重新创建结构更改的快照。
现在确定需要快照的流。下一步是调用适当的域模型代码以将流重播到当前状态。然后该状态被保存到快照表中。Stream.Type 可用于确定要调用哪个流重放代码。如果您没有选择在 Stream 表中包含 Type 列,那么您还可以读取流的第一个事件。通常这些表示它是流的类型。

多租户
当多个租户属于同一个父组织时提供汇总报告。或立即测量系统上的活动。当通过表分离租户时,生成一个查询所有租户的事件是很痛苦的。
分区
一种在逻辑上分离租户数据但又具有单个 Event 表的便利的方法是使用 Postgres 的表分区。他们在最近的版本中为此功能添加了很多内容。而在 Postgres 11 中,这个特性尤其强大。这是按租户 ID 分区的事件表版本。它看起来几乎一样!

CREATE TABLE IF NOT EXISTS Event
(
    SequenceNum bigserial NOT NULL,
    TenantId uuid NOT NULL,
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Type text NOT NULL,
    Meta jsonb NOT NULL,
    Data jsonb,
    LogDate timestamptz NOT NULL DEFAULT now(),
    CONSTRAINT pk_event_sequencenum PRIMARY KEY (TenantId, SequenceNum),
    CONSTRAINT uk_event_streamid_version UNIQUE (TenantId, StreamId, Version)
) PARTITION BY LIST (TenantId);

主表上的约束需要包含租户分区键,这里是TenantId。

租户分区
不幸的是,Postgres 还没有自动创建分区。因此,在为租户插入数据之前,您首先必须为其创建一个分区。添加新租户时通常会有一个配置过程,因此分区的创建可以简单地成为该过程的一部分。下面是使用 TenantId = 847ED1889E8B4D238EB49126EBD77A4D 创建租户的示例。

CREATE TABLE IF NOT EXISTS Event_847ED1889E8B4D238EB49126EBD77A4D
PARTITION OF Event FOR VALUES IN ('847ED1889E8B4D238EB49126EBD77A4D');

在幕后,每个分区都有自己的索引。所以你最终不会得到一个巨大的索引。当您跨租户查询数据时,Postgres 可以在分区上并行运行查询,然后聚合它们。

插入事件
插入数据就像将数据插入单个表一样,Postgres 会自动将其路由到适当的分区。

INSERT
  INTO Event
     ( TenantId
     , StreamId
     , Version
     , Type
     , Meta
     , Data
     )
VALUES
     ( '847ED1889E8B4D238EB49126EBD77A4D'
     , 'A88F94DB6E7A439E9861485F63CC8A13'
     , 1
     , 'EmptyEvent'
     , '{}'
     , NULL
     )
;

按顺序查询
要支持按发生顺序跨租户读取事件,您可以运行这样的查询。
SELECT *
  FROM Event
 WHERE SequenceNum > 0
 ORDER
    BY SequenceNum
 LIMIT 1000
;

此查询支持读取最多 1000 个批次。我避免使用OFFSET,因为一旦偏移值变大,它的效率就会很低。并且每个侦听器通常都会跟踪它处理的最后一个 SequenceNum。

我可以在 WHERE 子句中添加另一个条件,AND SequenceNum <= 1000而不是使用LIMIT. 但是由于并发性可能会跳过序列号(见下文)。虽然这是一个小问题。

按流查询
流式查询和以前一样,只是我们现在还需要提供 TenantId。

SELECT *
  FROM Event
 WHERE TenantId = '847ED1889E8B4D238EB49126EBD77A4D'
   AND StreamId = 'A88F94DB6E7A439E9861485F63CC8A13'
 ORDER
    BY Version
;

事件通知
每当将事件添加到事件表的任何分区时,您都可以触发 Postgres 通知。这是我目前使用的。

DROP TRIGGER IF EXISTS trg_EventRecorded ON Event;
DROP FUNCTION IF EXISTS NotifyEvent();

CREATE FUNCTION NotifyEvent() RETURNS trigger AS $$

    DECLARE
        payload text;

    BEGIN
        -- { sequencenum }/{ tenant id }/{ stream id }/{ version }/{ event type }
        SELECT CONCAT_WS( '/'
                        , NEW.SequenceNum
                        , REPLACE(CAST(NEW.TenantId AS text), '-', '')
                        , REPLACE(CAST(NEW.StreamId AS text), '-', '')
                        , NEW.Version
                        , NEW.Type
                        )
          INTO payload
        ;

        PERFORM pg_notify('eventrecorded', payload);

        RETURN NULL;

    END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_EventRecorded
    AFTER INSERT ON Event
    FOR EACH ROW
    EXECUTE PROCEDURE NotifyEvent()
;

当一个事件被插入时,它会触发一个通知到通道eventrecorded。

事件本身可能太大而无法容纳在通知负载中。因此,相反,我提供了足够的有关事件的数据,让听众知道他们是否想麻烦加载它。通常监听者只关心事件的类型和序列号。然后他们通常从他们处理的最后一个序列号开始加载一批事件。所以中间位可能是 YAGNI 违规。但是如果监听器想要加载特定事件而不是批处理,则包含在内感觉是正确的。
要监听事件通知,SQL 部分很简单:

LISTEN eventrecorded;

租户数据现在也很容易单独删除或备份,因为它实际上位于自己的单独表中。例如假设 TenantId = 847ED1889E8B4D238EB49126EBD77A4D 要求我们删除他们的数据,那么很容易摆脱它。

-- DANGER DANGER
DROP TABLE Event_847ED1889E8B4D238EB49126EBD77A4D CASCADE;

当数据集变大时,这种按租户划分事件流的方法可以提高性能。在某些时候,我们可能需要更进一步,将我们的数据分区到单独的数据库节点上。这种基本表结构似乎也适合使用 Citus Data(最近被 Microsoft 收购)之类的基于节点的分区。但是,当一个总体索引成为性能瓶颈时,上述方法应该可以解决这个尴尬的第一个扩展障碍。它并不比一张表更费力!