Debezium for PostgreSQL 基础知识


Arijit Mazumdar 撰写的文章《超越 Debezium for PostgreSQL 基础知识 - 第 1 部分》深入探讨了用于捕获和流式传输数据库更改的开源框架 Debezium 如何与 PostgreSQL 集成。以下是本文涵盖的要点:

Debezium 是非常流行的开源框架之一,它可以实时捕获和传输数据库更改。该领域的一个相对年轻的参与者于 2016 年在 Red Hat 开始工作,由 Confluent、Zalando 等许多其他参与者积极管理。Debezium 支持大多数流行的数据库。

Debezium 的核心是建立在 Kafka Connect 框架之上,默认情况下它将更改存储在 Kafka 中,尽管还存在许多其他模式。

在本系列中,我将尝试解释 Debezium 如何与 PostgreSQL 数据库(可以说是最流行的开源数据库)配合使用,以及在生产中部署它的一些设计注意事项。这只是我在过去几个月中积累的理解的一次自私尝试。

Debezium 概述

  • Debezium旨在实时捕获和传输来自各种数据库的变化,并利用 Kafka Connect 框架将这些变化存储在 Kafka 中。
  • 它支持多种数据库,尤其以与最流行的开源数据库之一 PostgreSQL 的兼容性而著称。

可以使用相对较少的配置来部署 Debezium。但是,这可能不足以满足许多生产级环境的需求。在了解 Debezium 配置的复杂性之前,我们应该尝试了解 Postgres 如何捕获并以流的形式发出更改。在下一部分中,我们将深入介绍 Debezium、Kafka Connect 和 Kafka。

PostgreSQL 流如何变化
简而言之,像 Debezium 这样的客户端使用复制槽和发布来读取 Postgres WAL 中的更改。但这些是什么呢?Postgres 将所有更改(CRUD、DDL 操作)记录在 WAL 中,尽管保留 WAL 的主要目的是其他。

使用预写式日志 (WAL),数据库可通过在将所有更改应用于数据库本身之前记录到数据库来确保数据库事务的持久性和一致性。这确保了即使在系统崩溃或故障的情况下,也可以通过重放 WAL 记录将数据库恢复到一致状态。

好处是,使用 WAL,我们可以及时存储所有更改,这些更改可用于跟踪发生的更改。Postgres 可以在内部使用 64 位整数维护 WAL 中的位置,该整数显示为两个用斜杠分隔的十六进制数,例如16/B374D848。这称为LSN(长序列号)。

对于有 Kafka 背景的人来说,这被用作消费者偏移量,用于跟踪 Kafka 消费者在 Kafka 主题中的位置。

接下来是 Postgres 如何发出更改:
这是使用逻辑复制进行的,该逻辑复制使用了发布和复制槽。PostgreSQL发布是一种将更改从 PostgreSQL 数据库流式传输到消费者(例如 Debezium)的机制。发布包含从一个或多个表生成的一组经过过滤的更改事件。想象一下报纸之类的发布。您选择要关注(跟踪更改)的部分(表格),发布会根据您的选择提供相关文章(更改事件)。每个发布中的数据都根据发布规范进行过滤,发布规范可以由 PostgreSQL 数据库管理员或 Debezium 连接器创建。

复制槽充当消费者(如 Debezium)从数据库接收更改流的专用连接。这确保即使 Debezium 遇到中断,所有更改也会在重新连接后被捕获并交付。复制槽用于确保对数据库所做的所有更改都被捕获并流式传输给消费者,即使在 Debezium 中断期间也是如此。这是通过复制槽存储最新的 LSN 以及已消费到复制槽的消费者数量来实现的。它类似于消​​费者组为 Kafka 消费者工作的方式。

重要的是要了解发布和复制槽确实会对 PostgreSQL 中的 WAL(预写日志)产生影响。当复制槽处于活动状态时,PostgreSQL 将保留订阅者所需的所有相关 WAL 段,即使它们比保留策略更旧。这可确保订阅者在断开连接并稍后重新连接时仍能赶上错过的更改。因此,对于 Debezium 来说,定期更新最后提交的 LSN 很重要,这样数据库就可以清理 WAL 中的旧记录。

所有这些更改都根据其复制标识保存在 WAL 中。副本标识是 PostgreSQL 的一项功能,它决定了在逻辑复制中如何处理 UPDATE 和 DELETE 操作,例如在将 Debezium 与 PostgreSQL 结合使用时。如果表有主键,则默认情况下它将成为副本标识,否则为无。在将 Debezium 与 PostgreSQL 结合使用时,确保正确设置副本标识非常重要。否则 Postgres 将不允许对表执行 UPDATE 和 DELETE 操作。

Debezium 的默认行为是创建发布和复制槽,这些槽将从数据库的所有表中使用。然而,这可能不是最佳设计,尤其是对于大型且繁忙的数据库。在下文中,我将尝试解释可能的调整选项。这将解释我刚才提到的东西。

PostgreSQL 中的变更捕获机制

  • PostgreSQL 使用预写日志 (WAL)来记录所有更改(CRUD 和 DDL 操作),以确保持久性和一致性。此日志记录允许在系统发生故障时进行恢复。
  • 使用长序列号(LSN)来跟踪更改,其功能类似于 Kafka 中的消费者偏移量。

逻辑复制

  • 更改通过逻辑复制从 PostgreSQL 发出,它使用发布和复制槽:
    • 发布定义了哪些表的变化被流式传输,类似于选择报纸中的部分内容。
    • 复制槽作为专用连接,确保即使在中断期间也能捕获所有更改。

对 WAL 的影响

  • 活动复制槽导致 PostgreSQL 保留 WAL 段的时间比平时更长,从而允许像 Debezium 这样的订阅者在重新连接后赶上错过的更改。

配置选项

  • 本文讨论了使用 PostgreSQL 配置 Debezium 的两种主要方法:
    1. 使用 Debezium Connector:此方法允许根据指定的配置自动创建发布和复制槽。
    2. 直接数据库设置:用户可以直接在 PostgreSQL 数据库中手动创建发布和复制槽。


1、设置发布和副本槽
Debezium 文档中详细记录了启用 Debezium 的数据库设置。我们将深入介绍设置发布和复制槽的方法,无论是使用 Debezium 连接器还是直接进入数据库。我将介绍这两种方法。

首先,我创建了一个小型数据库,其中包含一些表和行并在 PostgreSQL 数据库 docker 容器中运行。此图像由 Debezium 团队提供,其中为 Debezium 设置了一切。请参阅此处

选项 - 1: 使用 Debezium 连接器
以下是相关重要属性:

  1. autocreate.mode - 默认值 all_tables 假定将创建新的刊物(如 publication.name 中所述),如果已经存在,则使用相同的值。 更好的方法是使用禁用或更好的使用过滤(我的最爱),debezium 将根据 schema.include.list、schema.exclude.list、table.include.list 和 table.exclude.list 创建或更新出版物
  2. publication.name - 出版物的名称。 如果更改此值,将创建新的出版物(除非 publication.autocreate.mode = 禁用),例如 my_publications
  3. slot.name - 插槽名称,例如 my_slot

方案 - 2:直接在数据库中创建
另一种方法是直接在数据库中创建这些对象。 我还引入了一些系统表,通过这些表我们可以获得用于发布和复制插槽的有趣信息。

创建发布者:

exercises=<strong>CREATE PUBLICATION my_publications FOR TABLE cd.bookings, cd.members  </strong>
exercises=<strong>WITH (publish = 'insert, update, delete');</strong>

 
exercises=<strong>ALTER PUBLICATION my_publications ADD TABLE cd.facilities;</strong>

exercises=<strong>SELECT *</strong>
 FROM pg_publication 
 WHERE pubname = 'my_publications';

  oid  |     pubname     | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
-------+-----------------+----------+--------------+-----------+-----------+-----------+-------------+------------
 16441 | my_publications |       10 | f            | t         | t         | t         | f           | f
(1 row)

exercises=<strong>SELECT *</strong>
 FROM pg_publication_tables pt
 WHERE pt.pubname = 'my_publications';

     pubname     | schemaname | tablename  |                                  attnames                                  | rowfilter
-----------------+------------+------------+----------------------------------------------------------------------------+-----------
 my_publications | cd         | bookings   | {bookid,facid,memid,starttime,slots}                                       |
 my_publications | cd         | facilities | {facid,name,membercost,guestcost,initialoutlay,monthlymaintenance}         |
 my_publications | cd         | members    | {memid,surname,firstname,address,zipcode,telephone,recommendedby,joindate} |
(3 rows)


使用订阅创建复制插槽

exercises=<strong>SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');</strong>

exercises=<strong>SELECT * FROM pg_replication_slots;</strong>

 slot_name |  plugin  | slot_type | datoid | database  | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase
-----------+----------+-----------+--------+-----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------
 my_slot   | pgoutput | logical   |  16388 | exercises | f         | f      |            |      |          744 | 0/1BD30D8   | 0/1BD3110           | reserved   |               | f
(1 row)

exercises=<strong>SELECT * FROM pg_stat_replication_slots;</strong>
 slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset
-----------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
 my_slot   |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 |
(1 row)

从 pg_replication_slots 的输出中可以观察到有趣的几点:

  • active_pid 为空,active 为假,因为没有人从复制插槽中消费
  • confirmed_flush_lsn 具有订阅者可以从 WAL 中消费的最旧 LSN(本例中为 0/1BD3110)。 在此之前的所有更改都有可能从 WAL 中删除。

部署 Debezium Connector 并监控
让我们在数据库中执行更多活动并部署 Debezium 连接器(我选择kcctl )。连接器配置可以在这里这里找到。


选项——1:使用 Debezium Connector

exercises=<strong>SELECT *</strong>
FROM pg_publication
where pubname='my_publications_opt1';
  oid  |       pubname        | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
-------+----------------------+----------+--------------+-----------+-----------+-----------+-------------+------------
 16446 | my_publications_opt1 |       10 | f            | t         | t         | t         | t           | f
(1 row)

exercises=<strong>SELECT *</strong>
FROM pg_publication_tables pt
WHERE pt.pubname = 'my_publications_opt1';
       pubname        | schemaname | tablename  |                                  attnames     | rowfilter
----------------------+------------+------------+----------------------------------------------------------------------------+-----------
 my_publications_opt1 | cd         | facilities | {facid,name,membercost,guestcost,initialoutlay,monthlymaintenance}         |
 my_publications_opt1 | cd         | members    | {memid,surname,firstname,address,zipcode,telephone,recommendedby,joindate} |

exercises=<strong>select * from pg_replication_slots where slot_name='my_slot_opt1';</strong>
  slot_name   |  plugin  | slot_type | datoid | database  | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase
--------------+----------+-----------+--------+-----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------
 my_slot_opt1 | pgoutput | logical   |  16388 | exercises | f         | t      |        676 |      |          747 | 0/1BED778   | 0/1BED778           | reserved   |               | f
(1 row)

exercises=<strong>SELECT * FROM pg_stat_replication_slots where slot_name='my_slot_opt1';</strong>
  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset
--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
 my_slot_opt1 |          0 |           0 |           0 |           0 |            0 |            0 |          2 |        3416 |
(1 row)

<strong>After a few insert / updates in the database</strong>
exercises=<strong>select * from pg_replication_slots where slot_name='my_slot_opt1';</strong>
  slot_name   |  plugin  | slot_type | datoid | database  | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase
--------------+----------+-----------+--------+-----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------
 my_slot_opt1 | pgoutput | logical   |  16388 | exercises | f         | t      |        715 |      |          748 | 0/1BED7B0   | 0/1C08A88           | reserved   |               | f
(1 row)
  • 在 pg_publication_tables 表中,只添加了 facilities 和 members,而 booking 则如预期的那样丢失了。
  • 在表中的一些相关事务(Debezium 订阅的表)之后,confirmed_flush_lsn 被移动到了不同的值,通过这种方式,Debezium 通知 Postgres 清理不需要的旧 WAL。

选项 - 2:直接在数据库中设置

exercises=<strong>SELECT *</strong>
FROM pg_publication_tables pt
WHERE pt.pubname = 'my_publications';
     pubname     | schemaname | tablename  |                                  attnames                                  | rowfilter
-----------------+------------+------------+----------------------------------------------------------------------------+-----------
 my_publications | cd         | bookings   | {bookid,facid,memid,starttime,slots}                                       |
 my_publications | cd         | facilities | {facid,name,membercost,guestcost,initialoutlay,monthlymaintenance}         |
 my_publications | cd         | members    | {memid,surname,firstname,address,zipcode,telephone,recommendedby,joindate} |
(3 rows)

exercises=<strong>SELECT *</strong>
FROM pg_publication_tables pt
WHERE pt.pubname = 'my_publications';
     pubname     | schemaname | tablename  |                                  attnames                                  | rowfilter
-----------------+------------+------------+----------------------------------------------------------------------------+-----------
 my_publications | cd         | bookings   | {bookid,facid,memid,starttime,slots}                                       |
 my_publications | cd         | facilities | {facid,name,membercost,guestcost,initialoutlay,monthlymaintenance}         |
 my_publications | cd         | members    | {memid,surname,firstname,address,zipcode,telephone,recommendedby,joindate} |
(3 rows)

exercises=<strong>select * from pg_replication_slots where slot_name='my_slot';</strong>
 slot_name |  plugin  | slot_type | datoid | database  | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase
-----------+----------+-----------+--------+-----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------
 my_slot   | pgoutput | logical   |  16388 | exercises | f         | t      |        787 |      |          753 | 0/1C08C08   | 0/1C08C40           | reserved   |               | f
(1 row)

exercises=<strong>SELECT * FROM pg_stat_replication_slots where slot_name='my_slot';</strong>
 slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset
-----------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
 my_slot   |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 |
(1 row)

#after a few transactions in tables
exercises=<strong>SELECT * FROM pg_stat_replication_slots where slot_name='my_slot';</strong>
 slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset
-----------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
 my_slot   |          0 |           0 |           0 |           0 |            0 |            0 |          3 |        1136 |
(1 row)

值得注意的是,虽然在两个选项中都创建了相同的 2 个主题,但在选项-1 中,过滤是在数据库层设置发布时进行的,而在选项-2 中,过滤是在 Kafka 连接器层进行的。 这意味着在方案-2 中,与预订相关的更改被发送到 Debezium,并在那里被过滤掉。 如果预订表非常繁忙,我们就会向 Debezium 发送不必要的流量,而这些流量无论如何都会被过滤掉。

另一个需要注意的重要事项是,虽然没有在直接在数据库中创建出版物时进行描述,但我们可以添加过滤条件(where 子句)或选择特定列。 通过这种方式,我们可以限制从数据库到 Debezium 及其他数据库的数据传输。 在 Debezium 内部使用 SMT 也可以实现类似的功能。