Python中Debezium+PostgreSQL实现变更数据捕获


该项目演示了 Debezium 作为变更数据捕获 (CDC) 工具与 PostgreSQL 的集成。Debezium 捕获数据库中的更改并将其发布到 Apache Kafka,从而允许实时消费数据库更改。

更改数据捕获 ( CDC ) 是一种旨在识别和跟踪对数据库内数据所做的增量更改的技术。CDC 的主要目标是在数据库和下游应用程序之间提供低延迟、可靠且可扩展的数据复制。本质上,CDC将提交给数据库的更改视为事件流,并将它们异步传送到下游服务,包括数据插入、更新和删除。

  • CDC流程首先将数据库与 CDC 集成,在设置期间捕获其初始状态。
  • 随后,CDC通过将数据库的当前状态与其先前状态进行比较来识别数据的更改。
  • 这些更改被分类为插入、更新或删除,并附有重要的元数据,例如时间戳、数据库用户、更改类型、事务 ID 和更改序列号。
  • CDC存储这些更改并将其传播到需要与数据库同步的下游系统。

CDC 有多种类型:

  • 1. 基于日志的CDC:从数据库事务日志中捕获更改。
  • 2. 基于触发器的CDC:利用表上的触发器来捕获发生插入、更新或删除时的变化。
  • 3.基于比较的CDC:定期比较源数据库和目标数据库的快照以识别更改。
  • 4. 基于日志的 CDC:依靠数据库的审计跟踪来捕获更改。
  • 5. 数据库轮询CDC:定期轮询数据库以识别自上次轮询以来的更改。
  • 6.内置CDC:某些数据库(例如SQL Server)具有内置CDC功能,可以将其配置为自动跟踪更改。

CDC与Debezium
Debezium是一个基于Apache Kafka构建的分布式 CDC 系统,提供各种Kafka连接器,用于与不同数据库管理系统 (DBMS)集成。Debezium 的优势在于与 Kafka 的集成,提供可靠且一致的事件流。

Docker环境搭建
要使用 Debezium 实现CDC ,可以使用以下 docker-compose.yml 文件设置Docker环境:

version: '3'
services:
  postgres:
    image: postgres:latest
    container_name: postgres_db
    restart: always
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: user_password
      POSTGRES_DB: demo_db
    ports:
      - "5432:5432"

  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.3
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-enterprise-kafka:5.5.3
    container_name: kafka
    restart: always
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT:
//kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT

  debezium:
    image: debezium/connect:1.4
    container_name: debezium
    restart: always
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http:
//schema-registry:8081
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http:
//schema-registry:8081
    depends_on: [kafka]
    ports:
      - 8083:8083

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.3
    container_name: schema_reg
    restart: always
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http:
//schema-registry:8081,http://localhost:8081
    ports:
      - 8081:8081
    depends_on: [zookeeper, kafka]


该 Docker 环境为 Apache Kafka、Zookeeper、PostgreSQL 数据库、Debezium 和 Schema Registry 配置了容器,从而为使用变更数据捕获技术捕获和管理 PostgreSQL 数据库中的变更提供了全面的解决方案:

  • 1.Postgres 数据库中的变更由 Debezium 捕捉。
  • 2.Debezium 将这些更改流式传输到 Kafka 主题。
  • 3.使用 Avro 编码数据,模式由模式注册中心管理。
  • 4.afka 代理促进 Debezium 与其他组件之间的通信。

要运行这些服务,请在包含 docker-compose 文件的目录中执行 `docker compose up -d`。
如果遇到任何问题,请查看 docker 日志。

Postgresql 设置
要与 Debezium 成功集成,必须启用 PostgreSQL 的逻辑解码功能,以提取提交到事务日志的更改。默认情况下,前向写入日志(WAL)段设置为 "复制",这意味着如果启动 Debezium 连接器,它将不会拥有对数据库所做所有更改的完整历史记录。为了解决这个问题,需要将 `wal_level` 设置为 `logical`。

在 Docker 环境中通过终端执行以下命令:

docker exec -it postgres_db vim /var/lib/postgresql/data/postgresql.conf

如果 Docker 环境中没有 Vim,也可以使用其他文本编辑器。或者,你也可以将文件复制到本地目录,进行必要的编辑,然后保存回 Docker 环境。
记住要重启 PostgreSQL 服务才能执行更改。

复制到本地计算机:
`docker cp postgres_db:/var/lib/postgresql/data/postgresql.conf /path/to/your/target/directory`

将其保存回 PostgreSQL 服务:
`docker cp postgresql.conf postgres_db:/var/lib/postgresql/data/postgresql.conf`

Debezium 配置
现在,让我们为 PostgreSQL 配置 Debezium 连接器。
确保执行此配置的数据库用户拥有所需的权限,如复制、创建和选择权限。

首先,为此在 PostgreSQL 数据库中创建一个表:

CREATE TABLE transactions (
    customer_name VARCHAR(255),
    invoice_number INT,
    item VARCHAR(255),
    quantity INT,
    price DECIMAL(10, 2)
);

创建表格后,定义 Debezium PostgreSQL 连接器设置。
下面的 JSON 配置演示了如何设置 Debezium 连接器:

{
    "name": "transaction-connector",
   
"config": {
        
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",

       
"plugin.name": "pgoutput",
       
"database.hostname": "your_postgres_service_ip_address",
       
"database.port": "5432",
       
"database.user": "user",
       
"database.password": "user_password",
       
"database.dbname": "demo_db",
       
"database.server.name": "postgres",
       
"key.converter.schemas.enable": "false",
       
"value.converter.schemas.enable": "false",
       
"transforms": "unwrap",
       
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        
"value.converter": "org.apache.kafka.connect.json.JsonConverter",

        
"key.converter": "org.apache.kafka.connect.json.JsonConverter",

       
"table.include.list": "public.transactions", change table schema if necessary
       
"slot.name": "dbz_transaction_slot"}
    }

在任何 API 客户端上执行此配置都会创建`transaction_connector`。该连接器还会根据数据库名称和目标表生成一个 Kafka 主题,例如`postgres.public.transactions`。
您可以通过检查可用的 Kafka 主题来验证这一点。

在 PostgreSQL 表中插入数据后,创建的连接器会将数据解析到指定的 Kafka 主题中。有关连接器字段的详细说明,请参阅用于 PostgreSQL 的 debezium 连接器。

使用Python更改数据捕获
有关代码实现,请参阅提供的GitHub[url=https://github.com/Limookiplimo/Change-data-capture]存储库[/url]以获取基于 Python 的解决方案。

前提要求:

  • Docker
  • Docker Compose
  • Python 3.x
  • Python libraries and packages (requirements.txt)

设置

  1. 启动Docker服务:运行以下命令启动PostgreSQL、Zookeeper、Kafka、Debezium和SchemaRegistry服务: docker compose up -d
  2. 配置 Debezium 连接器:编辑postgresql_connector.py文件以在connector_config 字典中包含正确的 PostgreSQL 服务 IP 地址。
  3. 创建 Debezium 连接器:运行以下命令创建 Debezium 连接器: python postgresql_connector.py
  4. 生成并加载示例数据 执行以下命令生成示例数据并将其加载到 PostgreSQL 数据库中: python python_job.py
  5. 检查Kafka主题 执行以下命令检查Kafka主题: python kafka_check_topics.py
  6. 使用 Kafka 中的更改运行以下命令以使用 Kafka 主题 postgres.public.transactions 中的更改: python kafka_read_topics.py
  7. 清理运行以下命令停止 Docker 服务: docker-compose down

重要笔记

  • 确保 postgresql 数据库预写日志 (WAL) 级别值设置为logical。
  • 确保安装了所需的 Python 库。您可以使用以下命令安装它们pip install -r requirements.txt:
  • 根据您的 PostgreSQL 和 Kafka 配置调整 Python 脚本中的连接参数(postgresql_connector.py、python_job.py、kafka_read_topics.py)。
  • 有关 Debezium 及其配置选项的更多信息,请参阅Debezium 文档