基于Postgres逻辑复制的推送式发件箱模式


只有几个模式让我觉得很舒服:“如果你想构建成熟的系统,你应该一直使用它”。其中之一是发件箱模式。为什么?因为它保证了你的业务流程和沟通不会卡在中间。
正如我在发件箱中解释的,收件箱模式和交付保证解释了:
发件箱模式确保消息至少成功发送(例如发送到队列)一次。使用这种模式,我们不是直接将消息发布到队列中,而是将其存储在临时存储中(例如数据库表)。我们用工作单元(事务)包装实体保存和消息存储。这样,我们确保在存储应用程序数据时消息不会丢失。稍后将通过后台进程发布。此过程将检查表中是否有任何未发送的消息。当工人找到任何东西时,它会尝试发送它们。在它得到发布的确认(例如来自队列的ACK)后,它会将事件标记为已发送。

为什么它提供至少一次而不是完全一次?写入数据库可能会失败(例如,它不会响应)。发生这种情况时,处理发件箱模式的进程将在一段时间后尝试重新发送事件并尝试这样做,直到消息在数据库中正确标记为已发送。_

我们可以使用发件箱模式发布任何类型的消息。通常我们发送命令或发布事件(请参阅What's the difference between a command and an event?了解语义为何重要)。

如果您使用的是关系数据库,通常需要为消息创建表。

示例结构如下所示:

CREATE TABLE publications(
-- the autoincremented position of the message to respect the order
position BIGSERIAL PRIMARY KEY,
-- this may allow you to partition publications, e.g. per tenant
publication_id VARCHAR(250) NOT NULL,
-- unique message id, which can be used for deduplication or idempotency
message_id VARCHAR(250) NOT NULL,
-- the message type, e.g. `TransactionRecorded`
message_type VARCHAR(250) NOT NULL,
-- serialised message data, e.g. to JSON
data JSONB NOT NULL,
-- diagnostic information of when the message was scheduled
scheduled TIMESTAMP WITH TIME ZONE NOT NULL default (now()),
);

如果我们希望拥有多个订阅者并并行发布,那么我们还应该定义包含有关发布过程的信息的表。
CREATE TABLE subscribers
(
-- subscription name
subscription_id VARCHAR(250) PRIMARY KEY,
-- information about the position of the last processed message
last_processed_position INTEGER NULL,
-- inform which publication we're subscribing to
publication_id VARCHAR(250) NOT NULL,
);

我们可以使用这样的表结构订阅特定的消息序列(称为发布)。一旦我们处理了特定消息,使用它的位置,我们更新订阅的最后处理位置。多亏了这一点,即使过程在中间中断,我们也可以从先前已知的位置重试。

通常,此处理通过轮询进行。在定义的时间间隔内,我们正在查询消息表以获取要发布的新消息。例如:

SELECT
p.position, p.publication_id, p.message_id, p.message_type, [url=http://p.data/]p.data[/url], p.scheduled
FROM publication p
WHERE
p.publication_id = subscription_publication_id
p.last_processed_position > subscription_last_processed_position
LIMIT 100

这在很多情况下都很好,但是:
  • 找到合适的间隔并不容易。Marten Async Daemon会根据添加新消息的速度来调整它。但是,它仍然是一个近似值。
  • 轮询可能需要更多资源并具有更大的内存压力。
  • 它并不像看起来那样容易以可靠和有弹性的方式扩展它(例如,拥有多个节点、领导者选举等)

如果我们能在新消息到达时得到数据库通知,那就太好了,对吧?
如果您使用的是 Postgres,那么您可能是幸运的人!

Postgres 逻辑复制
我不确定您是否知道,但关系数据库的工作方式类似于事件存储。他们有一个称为“预写日志”(WAL)的概念。它是一个仅追加结构,记录事务处理期间的所有操作(插入、更新、删除)。当我们提交一个事务时,数据首先被附加到预写日志中。然后将所有操作应用于表、索引等。因此得名“Write-Ahead”:在其他更改之前将数据写入日志。所以从这个角度来看,表和索引只是预写日志的读取模型。
Postgres 是一个坚如磐石的数据库,具有许多出色的功能。其中之一是我们在Marten中使用的 JSON 支持,另一个是我们现在将仔细研究的逻辑复制。

逻辑复制将传统方法提升到了一个新的水平。我们不是发送备份数据库文件的原始二进制流,而是发送记录在预写日志中的更改流。它被命名为逻辑,因为它理解操作的语义,以及关于它正在复制的表的信息。它非常灵活;它可以为一个或多个表定义,过滤记录和复制数据子集。它可以通知您有关特定记录的更改。因此它要求复制表具有主键。

第一步是在 Postgres 配置(postgresql.conf)中启用逻辑复制:

wal_level = logical

如果是 Docker compose,您可以将其配置为:
version: "3"
services:
postgres:
image: postgres:14.5-alpine
ports:
-
"5432:5432"
environment:
- POSTGRES_DB=postgres
- POSTGRES_PASSWORD=postgres
command:
-
"postgres"
-
"-c"
-
"wal_level=logical"

与上述发件箱表类似,它具有发布-订阅语义。
要定义发布,我们需要调用以下 SQL 语句:
CREATE PUBLICATION outbox_pub FOR TABLE outbox;

我们还需要告诉 Postgres 它应该保留执行逻辑复制所需的预写日志条目,即使副本/订阅者与数据库断开连接也是如此。我们通过定义逻辑复制槽来做到这一点:
SELECT * FROM pg_create_logical_replication_slot('outbox_slot', 'pgoutput');

第一个参数只是一个手绘名称。另一个是我们要使用的逻辑解码插件的名称。仍然使用定义格式的二进制数据执行逻辑复制。它对人类甚至其他系统都不是超级可读的。为了能够翻译复制格式,我们可以使用逻辑解码插件。他们很多,来自最知名的;让我提示一下:
  • test-decoding - 允许读取 SQL 格式的复制数据。一个内置的;顾名思义,它是为快速测试而设计的。
  • wal2json - 将 WAL 变更集转换为 JSON 格式。
  • pg_output - 标准解码插件。

pg_output可能是最受欢迎的。它用于最知名和最成熟的工具Debezium。它也用于Npgsql,.NET Postgres 数据提供程序。我们将使用它来展示实践中的逻辑复制。

.NET 逻辑复制示例
假设我们使用的是上述发布表的简化版本:

CREATE TABLE outbox (
position BIGSERIAL PRIMARY KEY,
event_type VARCHAR(250) NOT NULL,
data JSONB NOT NULL
);

让我们开始塑造通用 API。我们的订阅可能如下所示:
public interface IEventsSubscription
{
IAsyncEnumerable<object> Subscribe(EventsSubscriptionOptions options, CancellationToken ct);
}
public record EventsSubscriptionOptions(
string ConnectionString,
string SlotName,
string PublicationName
);

我们只有一个方法,它采用订阅选项和CancellationToken来停止订阅。订阅返回IAsyncEnumerable,这是一个用于获取异步通知流的 .NET 接口。它非常适合发布-订阅语义。我们会得到一个消息流,因为它们可能有各种 CLR 类型,然后我们需要使用最小的公分母:基对象类。订阅选项用于将连接字符串传递给 Postgres 数据库以及插槽和发布名称,以了解我们正在订阅的内容。
现在让我们讨论实现我们的订阅方法的细节。首先是打开连接并获取复制槽。我们通过以下方式做到这一点:
var (connectionString, slotName, publicationName) = options;
await using var conn = new LogicalReplicationConnection(connectionString);
await [url=http://conn.open/]conn.Open[/url](ct);
var slot = new PgOutputReplicationSlot(slotName);

有了这个,我们可以开始订阅:

await foreach (var message in conn.StartReplication(slot, new PgOutputReplicationOptions(publicationName, 1), ct))
{
// TODO: PLACE PROCESSING LOGIC HERE
// Always call SetReplicationStatus() or assign LastAppliedLsn and LastFlushedLsn individually
// so that Npgsql can inform the server which WAL files can be removed/recycled.
conn.SetReplicationStatus(message.WalEnd);
// send the ACK to Postgres that we processed message
await conn.SendStatusUpdate(ct);
}

我们可以使用异步foreach循环。它将无限运行,直到与 Postgres 的连接断开,或者我们通过CancellationToken取消订阅。它将针对我们从 Postgres 获得的每个复制消息通知运行。pg_output产生多种类型的消息。由于我们的发件箱表是仅追加的,因此我们只关注插入部分。我们的完整类实现如下所示:
public class EventsSubscription: IEventsSubscription
{
    public async IAsyncEnumerable<object> Subscribe(EventsSubscriptionOptions options, [EnumeratorCancellation] CancellationToken ct)
    {
        var (connectionString, slotName, publicationName) = options;
        await using var conn = new LogicalReplicationConnection(connectionString);
        await conn.Open(ct);

        var slot = new PgOutputReplicationSlot(slotName);

        await foreach (var message in conn.StartReplication(slot, new PgOutputReplicationOptions(publicationName, 1), ct))
        {
            if (message is InsertMessage insertMessage)
            {
                yield return await InsertMessageHandler.Handle(insertMessage, ct);
            }

            conn.SetReplicationStatus(message.WalEnd);
            await conn.SendStatusUpdate(ct);
        }
    }
}

我们将把消息处理交给专用的InsertMessageHandler。

public static class InsertMessageHandler
{
    public static async Task<object> Handle(InsertMessage message, CancellationToken ct)
    {
        var columnNumber = 0;
        var eventTypeName = string.Empty;

        await foreach (var value in message.NewRow)
        {
            switch (columnNumber)
            {
                case 1:
                    eventTypeName = await value.GetTextReader().ReadToEndAsync();
                    break;
                case 2 when value.GetDataTypeName().ToLower() == "jsonb":
                {
                    var eventType = Type.GetType(eventTypeName);
                    if (eventType is null)
                        throw new ArgumentOutOfRangeException(nameof(eventType));

                    var @event = await JsonSerialization.FromJsonAsync(eventType, value.GetStream(), ct);

                    return @event!;
                }
            }

            columnNumber++;
        }

        throw new InvalidOperationException(
"You should not get here");
    }
}

我们需要遍历插入消息中的所有列。我们假设我们的订阅将从我们的标准化发件箱表中获取消息。我们需要从第二列获取消息类型并使用它来反序列化方法。我们假设我们将拥有 CLR 类型的程序集限定名称(这是最好的技术吗?从长远来看,您可能需要考虑消息版本控制,但这是另一回事)。反序列化消息后,我们可以将其返回并将其传播给订阅者。

var subscriptionOptions = new EventsSubscriptionOptions(ConnectrionString, "outbox_slot", "outbox_pub");
var subscription = new EventsSubscription();
await foreach (var readEvent in subscription.Subscribe(subscriptionOptions, ct))
{
// Handle upcoming event here, e.g.
// eventBus.Publish(readEvent);
}

事件该怎么办?您可以将其通过管道传输到内存中的处理程序,将其转发到消息总线,一般来说,做您的想象力和理智允许您做的事情。

概括
Postgres 在每个版本中都投入了大量资金,以使逻辑复制变得最健壮。