该项目演示了 Debezium 作为变更数据捕获 (CDC) 工具与 PostgreSQL 的集成。Debezium 捕获数据库中的更改并将其发布到 Apache Kafka,从而允许实时消费数据库更改。
更改数据捕获 ( CDC ) 是一种旨在识别和跟踪对数据库内数据所做的增量更改的技术。CDC 的主要目标是在数据库和下游应用程序之间提供低延迟、可靠且可扩展的数据复制。本质上,CDC将提交给数据库的更改视为事件流,并将它们异步传送到下游服务,包括数据插入、更新和删除。
- CDC流程首先将数据库与 CDC 集成,在设置期间捕获其初始状态。
- 随后,CDC通过将数据库的当前状态与其先前状态进行比较来识别数据的更改。
- 这些更改被分类为插入、更新或删除,并附有重要的元数据,例如时间戳、数据库用户、更改类型、事务 ID 和更改序列号。
- CDC存储这些更改并将其传播到需要与数据库同步的下游系统。
CDC 有多种类型:
- 1. 基于日志的CDC:从数据库事务日志中捕获更改。
- 2. 基于触发器的CDC:利用表上的触发器来捕获发生插入、更新或删除时的变化。
- 3.基于比较的CDC:定期比较源数据库和目标数据库的快照以识别更改。
- 4. 基于日志的 CDC:依靠数据库的审计跟踪来捕获更改。
- 5. 数据库轮询CDC:定期轮询数据库以识别自上次轮询以来的更改。
- 6.内置CDC:某些数据库(例如SQL Server)具有内置CDC功能,可以将其配置为自动跟踪更改。
CDC与Debezium
Debezium是一个基于Apache Kafka构建的分布式 CDC 系统,提供各种Kafka连接器,用于与不同数据库管理系统 (DBMS)集成。Debezium 的优势在于与 Kafka 的集成,提供可靠且一致的事件流。
Docker环境搭建
要使用 Debezium 实现CDC ,可以使用以下 docker-compose.yml 文件设置Docker环境:
version: '3' |
该 Docker 环境为 Apache Kafka、Zookeeper、PostgreSQL 数据库、Debezium 和 Schema Registry 配置了容器,从而为使用变更数据捕获技术捕获和管理 PostgreSQL 数据库中的变更提供了全面的解决方案:
- 1.Postgres 数据库中的变更由 Debezium 捕捉。
- 2.Debezium 将这些更改流式传输到 Kafka 主题。
- 3.使用 Avro 编码数据,模式由模式注册中心管理。
- 4.afka 代理促进 Debezium 与其他组件之间的通信。
要运行这些服务,请在包含 docker-compose 文件的目录中执行 `docker compose up -d`。
如果遇到任何问题,请查看 docker 日志。
Postgresql 设置
要与 Debezium 成功集成,必须启用 PostgreSQL 的逻辑解码功能,以提取提交到事务日志的更改。默认情况下,前向写入日志(WAL)段设置为 "复制",这意味着如果启动 Debezium 连接器,它将不会拥有对数据库所做所有更改的完整历史记录。为了解决这个问题,需要将 `wal_level` 设置为 `logical`。
在 Docker 环境中通过终端执行以下命令:
docker exec -it postgres_db vim /var/lib/postgresql/data/postgresql.conf
如果 Docker 环境中没有 Vim,也可以使用其他文本编辑器。或者,你也可以将文件复制到本地目录,进行必要的编辑,然后保存回 Docker 环境。
记住要重启 PostgreSQL 服务才能执行更改。
复制到本地计算机:
`docker cp postgres_db:/var/lib/postgresql/data/postgresql.conf /path/to/your/target/directory`
将其保存回 PostgreSQL 服务:
`docker cp postgresql.conf postgres_db:/var/lib/postgresql/data/postgresql.conf`
Debezium 配置
现在,让我们为 PostgreSQL 配置 Debezium 连接器。
确保执行此配置的数据库用户拥有所需的权限,如复制、创建和选择权限。
首先,为此在 PostgreSQL 数据库中创建一个表:
CREATE TABLE transactions ( |
创建表格后,定义 Debezium PostgreSQL 连接器设置。
下面的 JSON 配置演示了如何设置 Debezium 连接器:
{ |
在任何 API 客户端上执行此配置都会创建`transaction_connector`。该连接器还会根据数据库名称和目标表生成一个 Kafka 主题,例如`postgres.public.transactions`。
您可以通过检查可用的 Kafka 主题来验证这一点。
在 PostgreSQL 表中插入数据后,创建的连接器会将数据解析到指定的 Kafka 主题中。有关连接器字段的详细说明,请参阅用于 PostgreSQL 的 debezium 连接器。
使用Python更改数据捕获
有关代码实现,请参阅提供的GitHub[url=https://github.com/Limookiplimo/Change-data-capture]存储库[/url]以获取基于 Python 的解决方案。
前提要求:
- Docker
- Docker Compose
- Python 3.x
- Python libraries and packages (requirements.txt)
设置
- 启动Docker服务:运行以下命令启动PostgreSQL、Zookeeper、Kafka、Debezium和SchemaRegistry服务: docker compose up -d
- 配置 Debezium 连接器:编辑postgresql_connector.py文件以在connector_config 字典中包含正确的 PostgreSQL 服务 IP 地址。
- 创建 Debezium 连接器:运行以下命令创建 Debezium 连接器: python postgresql_connector.py
- 生成并加载示例数据 执行以下命令生成示例数据并将其加载到 PostgreSQL 数据库中: python python_job.py
- 检查Kafka主题 执行以下命令检查Kafka主题: python kafka_check_topics.py
- 使用 Kafka 中的更改运行以下命令以使用 Kafka 主题 postgres.public.transactions 中的更改: python kafka_read_topics.py
- 清理运行以下命令停止 Docker 服务: docker-compose down
重要笔记
- 确保 postgresql 数据库预写日志 (WAL) 级别值设置为logical。
- 确保安装了所需的 Python 库。您可以使用以下命令安装它们pip install -r requirements.txt:
- 根据您的 PostgreSQL 和 Kafka 配置调整 Python 脚本中的连接参数(postgresql_connector.py、python_job.py、kafka_read_topics.py)。
- 有关 Debezium 及其配置选项的更多信息,请参阅Debezium 文档。