使用Postgres 16 备用服务器实现CDC逻辑复制

对于变更数据捕获 (CDC) 的用户来说,Postgres 版本 16(今年 9 月发布)中最令人兴奋的功能之一是支持从备用服务器进行逻辑复制。您现在可以将 Debezium 等 CDC 工具指向副本服务器,而不是连接到主服务器,这从负载分布的角度来看非常有趣。

什么是 Postgres 逻辑复制?
让我们从基础知识开始。复制是将一台数据库服务器的所有数据同步到一台或多台其他服务器的过程,以确保高可用性 (HA)(如果主服务器发生故障,其中一台备用服务器可以接管)和负载分配,因为副本可以服务读取请求。当谈到Postgres中的复制时,它支持两种从主到副本的连续复制:流式复制和逻辑复制。

  • 流式复制(有时也称为物理复制)将预写日志 (WAL) 的所有段从主服务器传播到一台或多台备用服务器(或副本)。它包括所有数据库和表,并自动传播架构更改,从而生成主服务器的精确副本。这使得它成为确保数据高可用性的绝佳选择,特别是在同步模式下使用时,其中只有在成功复制后才会提交将事务写入主服务器。

  • 相比之下,逻辑复制的工作方式更具选择性,“根据数据对象的复制标识(通常是主键)来复制数据对象及其更改”。它在 Postgres 服务器上的单个数据库的上下文中运行,您可以在非常细粒度的级别上控制哪些模式或表应成为复制流的一部分。

逻辑复制提供了灵活的发布/订阅模型,超越了物理复制的基本主/只读副本方案。在此模型中,发布者节点是一个 Postgres 实例,它公开一个或多个发布,描述应复制哪些表。出版物还允许您限制哪些列、和操作(插入,更新, 或者删除)应予以传播。设置完成后,一个或多个订阅者节点将从发布中检索更改。

订阅者可以消费来自多个发布者的更改,也可以本身就是发布者。您可以在订阅者上执行本地写入事务,这意味着逻辑复制可以构成多主架构的基础。与流式复制不同,您可以在不同版本的 Postgres 实例之间使用逻辑复制,使其成为零停机版本升级的有用工具。

缺点是存在一些限制:最重要的是,逻辑复制不会传播 DDL 更改,即您必须自己确保源和目标的数据库模式同步。目前还不支持复制序列、(物化)视图和大型对象。

除了将数据从一个 Postgres 实例复制到另一个实例之外,逻辑复制如此有趣的原因还在于逻辑解码插件的概念。它们控制用于逻辑复制消息的数据格式,并允许外部(即非 Postgres)消费者订阅复制流。这就是 Debezium 等工具为 Postgres 实现变更数据捕获 (CDC) 的方式,支持将数据复制到数据仓库和数据湖、缓存失效、搜索索引的低延迟同步等用例。


为什么在备用服务器上使用逻辑复制?
使用备用服务器逻辑复制的第一个也是最重要的原因是负载分配。当您有许多逻辑复制槽和相应的客户端时,这可能会在主数据库服务器上造成过重的负载。在这种情况下,在主服务器的一个甚至多个只读副本上设置复制槽可能会很有帮助,从而在多台计算机上分配资源消耗(CPU、网络带宽)。
此外,在备用服务器上设置复制槽是启用故障转移槽的一个步骤,即逻辑复制使用者能够在主服务器发生故障并随后升级备用服务器后恢复处理。虽然 Postgres(尚)不支持故障转移槽,但您可以使用一些粘合代码自行实现它们,正如我们将在本博客系列的第二部分中讨论的那样。
最后,在某些情况下,人们只是不喜欢逻辑复制客户端的想法,特别是像 Debezium 这样的非 Postgres 工具,直接连接到他们的操作数据库。

无论正确与否,在只读副本上设置逻辑复制只会让您高枕无忧。

另一方面,还有一些需要考虑的含义。一是端到端延迟略有增加:由于更改从主服务器传输到备用服务器,然后传输到客户端,因此比将复制使用者直接连接到主服务器所需的时间要长一些。另一方面是只读副本正是只读的。虽然 CDC 通常不需要对数据库进行写访问,但也有一些例外。对于 Debezium,您将无法使用其增量快照实现,因为这需要将水印事件插入信令表中。

此外,您也不能使用心跳功能,该功能允许连接器在连接器正在捕获的任何表都没有更改事件进入的情况下更新其重新启动偏移量。如果您想使用这些功能,那么您需要在主服务器上创建复制槽。

使用 Debezium 进行备用逻辑复制
Postgres 逻辑复制的优点在于,不仅其他 Postgres 实例可以充当复制流的使用者,而且其他客户端也可以订阅复制槽以从 Postgres 获取实时更改事件源。Debezium为包括 Postgres 在内的许多数据库提供 CDC 支持,利用该支持通过 Apache Kafka 公开更改事件流,而且还通过其Debezium Server组件向其他类型的消息传递和流平台(例如 AWS Kinesis、Apache Pulsar、 NATS 等。因此,让我们快速测试如何使用 Debezium 从 Postgres 备用流传输更改。

请注意,我们需要使用最新的 Debezium 版本(上周发布的2.5.0.Beta 1 ),以便从备用服务器传输更改。当我第一次测试这个时,事情不太正常,因为连接器使用了该功能pg_current_wal_lsn()为了获得当前的 WAL 位置。不过,这仅在主服务器上可用。因此,我利用这个机会在很长一段时间内做出了我的第一个小小的Debezium 贡献,将其更改为调用pg_last_wal_receive_lsn()相反,当连接到备用设备时。非常感谢团队快速合并并包含在 Beta1 版本中!

作为此实验的游乐场,我创建了一个简单的Docker Compose 文件,该文件启动 Kafka 和 Kafka Connect 作为 Debezium 的运行时环境。

有趣的事实:这在 KRaft 模式下使用 Kafka,即不需要 ZooKeeper 进程。美好时光!如果您想继续操作,请确保您已安装 Docker,并在 Amazon RDS 上设置了 Postgres 主节点和备用节点,如第一部分中所述。然后克隆可解码/示例存储库并启动演示环境,如下所示:

git clone git@github.com:decodableco/examples.git decodable-examples
cd decodable-examples/postgres-logical-replication-standby
docker compose up

为了在 RDS 上将 Debezium 与 Postgres 一起使用,建议使用PG输出逻辑解码插件。它是标准解码插件,也用于逻辑复制到其他 Postgres 实例。该插件需要设置一个发布,该发布配置应为哪些表发布哪些类型的更改。通常,Debezium 会自动设置发布(类似于逻辑复制槽)。不幸的是,当从备用服务器接收更改时,不支持此功能,因为发布(与复制槽不同)只能在主服务器上创建。Debezium 不知道主数据库,因此您需要在设置连接器之前手动创建该发布:


primary> CREATE PUBLICATION my_publication FOR ALL TABLES;
CREATE PUBLICATION

必须在主服务器上为备用复制槽创建发布似乎有些不一致,而且在操作方面也不理想,但这种要求可能有充分的理由。

接下来我们看一下连接器配置。这是通过基于 JSON 的配置文件完成的,如下所示:

{
 "name" : "inventory-connector",
 
"config" : {
   
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
   
"tasks.max" : "1",
   
"database.hostname" : "<Your Postgres Stand-By>",
   
"database.port" : "5432",
   
"database.user" : "<User>",
   
"database.password" : "<Password>",
   
"database.dbname" : "<Database>",
   
"topic.prefix" : "dbserver1",
   
"plugin.name" : "pgoutput",
   
"publication.name" : "my_publication",
   
"poll.interval.ms" : "100"
 }
}

将其应用到您自己的环境时,根据需要调整数据库主机、用户名、密码和数据库名称。要应用此配置,可以调用 Kafka Connect 的 REST API。但如果您像我一样并且倾向于忘记所有确切的端点 URL,那么请看一下kcctl (是的,泰迪​​熊表情符号是名称的一部分),我将在下面使用它。它是 Kafka Connect 的命令行客户端,可以非常轻松地创建连接器、重新启动和停止连接器等。遵循kubectl的语义,应用配置文件如下:

kcctl apply -f postgres-connector.json
Created connector inventory-connector

让我们看一下连接器及其状态:

kcctl describe connector inventory-connector
Name:       inventory-connector
Type:       source
State:      RUNNING
Worker ID:  172.21.0.3:8083
Config:
  connector.class:    io.debezium.connector.postgresql.PostgresConnector
  database.dbname:    <Database>
  database.hostname:  <Host>
  database.password:  <Password>
  database.port:      5432
  database.user:      <User>
  name:               inventory-connector
  plugin.name:        pgoutput
  poll.interval.ms:   100
  publication.name:   my_publication
  tasks.max:          1
  topic.prefix:       dbserver1
Tasks:
  0:
    State:      RUNNING
    Worker ID:  172.21.0.3:8083
Topics:
  dbserver1.public.some_data

确认连接器正在运行后,让我们在主数据库中进行快速更新,并检查 Kafka 中从备用实例摄取的相应更改事件:

docker run --tty --rm \
     --network postgres-logical-replication-standby_default \
     quay.io/debezium/tooling:1.2 \
     kcat -b kafka:9092 -C -o beginning -q -t dbserver1.public.some_data | jq .payload

{
  "before": null,
 
"after": {
   
"id": 1,
   
"short_text": "c4ca4",
   
"long_text": "3a3c3274941c83e253ebf8d2438ea5a2"
  },
 
"source": {
   
"version": "2.5.0.Beta1",
   
"connector": "postgresql",
   
"name": "dbserver1",
   
"ts_ms": 1702469846436,
   
"snapshot": "first_in_data_collection",
   
"db": "inventory",
   
"sequence": "[null,\�\"]",
   
"schema": "public",
   
"table": "some_data",
   
"txId": null,
   
"lsn": 406746824704,
   
"xmin": null
  },
 
"op": "r",
 
"ts_ms": 1702469849341,
 
"transaction": null
}
...

这些是连接器启动时发出的快照事件。让我们对主数据库上的一条记录进行更新:

primary> UPDATE some_data SET short_text='hello' WHERE id = 1;

此后不久,相应的更改事件应该出现在 Kafka 主题中:

...
  "before": {
   
"id": 1,
   
"short_text": "c4ca4",
   
"long_text": "3a3c3274941c83e253ebf8d2438ea5a2"
  },
 
"after": {
   
"id": 1,
   
"short_text": "hello",I
   
"long_text": "3a3c3274941c83e253ebf8d2438ea5a2"
  },
...

此时,您可以使用Apache FlinkDecodable Kafka 源连接器连接该更改事件流,以将其输入实时流处理管道。

走向故障转移插槽
在过去几年中,Postgres 对逻辑复制的支持已经有了长足的进步。但仍然缺少一件事:故障转移插槽。主服务器上的逻辑复制插槽不会传播到备用服务器上。这意味着,当主服务器意外宕机时,任何插槽都必须在升级后在新的主服务器上重新创建。遗憾的是,这会导致变更事件流中出现缺口,因为在创建新槽位之前发生的任何数据变更都会被遗漏。客户将被迫回填整个数据集(即 Debezium 术语中的快照),以确保没有数据丢失。

关于增加插槽故障转移支持的讨论最早可以追溯到 Postgres 9.6 版本。最近,Patroni为这个问题添加了自己的解决方案,EnterpriseDB也发布了pg_failover_slots,这是一个用于插槽故障转移的Postgres扩展。至于Postgres自己何时添加这一功能,我们拭目以待(正如本演讲所暗示的,可能会在Postgres 17中添加)。在此之前,在 pg_failover_slots 扩展不可用的情况下,例如在亚马逊 RDS 上,逻辑复制备用槽可以让你创建自己版本的故障转移槽。具体做法是在主用和备用机上创建两个相应的插槽,并使用 pg_replication_slot_advance()函数(在 Postgres 11 中添加)保持两者同步。复制用户首先会连接到主用的插槽。故障切换后,当备用服务器晋升为主服务器时,它会重新连接到该插槽。

为此,通过调用定期向前移动备用槽至关重要pg_replication_slot_advance()来自主槽的确认刷新 LSN,即 WAL 中已由主槽的使用者处理和确认的最新位置。否则,备用槽将保留越来越多的 WAL,而且消费者在故障转移后也会收到大量重复事件。

要做到这一点,关键是要通过调用 pg_replication_slot_advance(),定期将备用槽向前移动,并使用主槽已确认的刷新 LSN,即主槽消费者已处理和确认的 WAL 中的最新位置。否则,备用插槽将保留越来越多的 WAL,消费者也会在故障切换后收到大量重复事件。

例如,这可以通过 cron 作业来实现,或者在 AWS 上运行时,通过计划的 Lambda 函数来实现。该作业将通过 pg_replication_slots 视图定期检索主服务器上已确认的槽位的刷新 LSN:

primary> SELECT
  slot_name,
  confirmed_flush_lsn
FROM pg_replication_slots;
+--------------+---------------------+
| slot_name    | confirmed_flush_lsn |
|--------------+---------------------|
| primary_slot | 5F/C501098          |
+--------------+---------------------+

然后,备用插槽将被提前至该 LSN:

standby> SELECT * FROM pg_replication_slot_advance('failover_slot', '5F/C501098');
+---------------+------------+
| slot_name     | end_lsn    |
|---------------+------------|
| failover_slot | 5F/C501098 |
+---------------+------------+

备用插槽的前进频率取决于故障切换后愿意接受的重复数量:备用插槽与主插槽的距离越近,从一个插槽切换到另一个插槽时的重复数量就越少。请注意,备用插槽的前进速度绝不能超过主插槽的确认 LSN。否则,在故障切换后从备用插槽读取时,事件就会丢失。具体来说,在设置备用插槽时,它的 LSN 很可能比主插槽确认的 LSN 要新,因此首先必须同步两个插槽。为此,需要等待主插槽确认下一个 LSN,确保该 LSN 已复制到备用插槽,然后将备用插槽推进到该 LSN。

总之
包起来
从 Postgres 备用服务器进行逻辑复制一直是人们期待已久的功能,它最终随 Postgres 16 一起提供。它不仅允许您构建 Postgres 副本链(一台备用服务器订阅另一台备用服务器),而且还允许您构建 Postgres 副本链(一台备用服务器订阅另一台备用服务器)。 Postgres 客户端(例如 Debezium)不再局限于只能连接到主 Postgres 实例。这对于负载分配或您不希望 CDC 工具直接连接到主数据库的情况非常有用。

这里的最后一个缺失部分是对故障转移插槽的完全支持,为此您仍然需要一个单独的扩展(pg_failover_slots)或通过手动保持主和备用上的两个插槽同步来实现您自己的方法。很高兴在未来的 Postgres 版本中看到对此的官方支持。

最后,如果您想了解有关备用逻辑复制的更多信息,请查看 Postgres 社区中一些优秀人士的帖子: