将PostgreSQL变更通过Debezium+Redis Stream同步到Redis中


Debezium 是一个很好的工具,用于捕获数据库上发生的行级更改并将这些更改流式传输到我们选择的代理。
我们的目标是监听 PostgreSQL 更改并通过 Debezium 服务器将它们流式传输到 Redis 流。
通常将 Debizum 与 Kafka 一起使用,在我们的例子中,我们将通过使用 Redis Streams 来保持轻量级。

无需任何额外配置即可设置 Redis。

为了将 PostgreSQL 与 Debezium 一起使用,必须更改 postgreSQL 上的配置。

我们将在 postgreSQL 上使用的配置如下

listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
wal_level = logical
max_wal_senders = 3

正如我们所见,我们使用 PostgreSQL 的logical_decoding。
从文档中:
logical_decoding逻辑解码是将数据库表的所有持久更改提取为连贯的、易于理解的格式的过程,无需详细了解数据库的内部状态即可解释该格式。
在 PostgreSQL中,逻辑解码是通过将描述存储级别更改的 预写日志的内容解码为特定于应用程序的形式(例如元组流或 SQL 语句)来实现的。

我们还将为 PostgreSQL 创建一个命名空间和一个表。命名空间和表将是监听更改的目标对象。

#!/bin/bash
set -e
 
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
  create schema test_schema;
  create table test_schema.employee(
          id  SERIAL PRIMARY KEY,
          firstname   TEXT    NOT NULL,
          lastname    TEXT    NOT NULL,
          email       TEXT    not null,
          age         INT     NOT NULL,
          salary         real,
          unique(email)
      );
EOSQL

Debezium 必须能够与 PostgreSQL 服务器以及 redis 服务器交互。配置应该如下:

debezium.sink.type=redis
debezium.sink.redis.address=redis:6379
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.schema.whitelist=test_schema
debezium.source.plugin.name=pgoutput

通过检查配置,我们可以看到我们拥有 Debezium 与 PostgreSQL 数据库通信的必要信息

由于此演示将涉及三个不同的软件组件,因此docker compose 将派上用场:

version: '3.1'
 
services:
  redis:
    image: redis
    ports:
      - 6379:6379
    depends_on:
      - postgres
  postgres:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    volumes:
      - ./postgresql.conf:/etc/postgresql/postgresql.conf
      - ./init:/docker-entrypoint-initdb.d
    command:
      - "-c"
      -
"config_file=/etc/postgresql/postgresql.conf"
    ports:
      - 5432:5432
  debezium:
    image: debezium/server
    volumes:
      - ./conf:/debezium/conf
      - ./data:/debezium/data
    depends_on:
      - redis

通过使用Compose,我们能够在同一个网络上启动三个不同的软件组件。这有助于组件之间通过使用Compose上指定的服务的dns名称进行交互。同时,我们之前创建的配置文件也被挂载到Docker容器中。

为了让堆栈运行,我们将执行以下命令

$ docker compose up

由于它已经启动并运行,我们现在可以开始监听事件了。

我们将登录 Redis 并开始监听任何可能的数据库更新:

$ docker exec -it debezium-example-redis-1 redis-cli
> xread block 1000000 streams tutorial.test_schema.employee $

这会阻塞到我们从流中接收到一个事件。
如果我们检查流名称,我们应该看到 {server-name}.{schema}.{table} 的模式。这将允许消费者仅订阅感兴趣的更改。

$ docker exec -it debezium-example-postgres-1 psql postgres postgres
> insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','john1@doe.com',18,1234.23);
> \q

如果我们检查 redis 会话,我们应该看到我们收到了来自 Redis 流的事件:
127.0.0.1:6379> xread block 1000000 streams tutorial.test_schema.employee $
1) 1) "tutorial.test_schema.employee"
   2) 1) 1)
"1663796657336-0"
         2) 1)
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Key\"},\"payload\":{\"id\":1}}"
            2)
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":false,\"field\":\"schema\"},{\"type\":\"string\",\"optional\":false,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"txId\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"lsn\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"xmin\"}],\"optional\":false,\"name\":\"io.debezium.connector.postgresql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Envelope\"},\"payload\":{\"before\":null,\"after\":{\"id\":1,\"firstname\":\"John\",\"lastname\":\"Doe 1\",\"email\":\"john1@doe.com\",\"age\":18,\"salary\":1234.23},\"source\":{\"version\":\ŕ.9.5.Final\",\"connector\":\"postgresql\",\"name\":\"tutorial\",\"ts_ms\":1663796656393,\"snapshot\":\"false\",\"db\":\"postgres\",\"sequence\":\"[null,\\\�\\\"]\",\"schema\":\"test_schema\",\"table\":\"employee\",\"txId\":738,\"lsn\":24289128,\"xmin\":null},\"op\":\"c\",\"ts_ms\":1663796657106,\"transaction\":null}}"
(10.17s)
127.0.0.1:6379> 

您可以在GitHub 上找到源代码。