如何从MySQL中将变化的事件数据发布到Kafka?


如何使用Debezium向Kafka提交MySQL的更改数据事件? CDC(变更数据捕获)是将OLTP数据库系统与其他系统(如数据仓库,缓存,Spark或Hadoop)互连的最佳方式之一。
Debezium是由Red Hat开发的一个开源项目,旨在通过允许您从各种数据库系统(例如MySQL,PostgreSQL,MongoDB)中提取更改并将其推送到Apache Kafka来简化此过程。
在本文中,我们将了解如何使用Debezium从MySQL二进制日志中提取事件。

首先,您需要一个特定于数据库的Debezium连接器才能提取重做日志(例如Oracle,MySQL)或预写日志(例如PostgreSQL)。
您还需要运行Kafka,以便可以推送提取的日志事件并使其可用于企业系统中的其他服务。Debezium不需要Apache ZooKeeper,但Kafka需要它,因为它依赖于ZK的分布式共识以及线性化保证。

必须运行以下Docker容器安装Debezium:


> docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.5

> docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.5

> docker run -it --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5

> docker run -it --name kafka-connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.5

> docker run -it --name kafka-watcher --link zookeeper:zookeeper debezium/kafka:0.5 watch-topic -a -k dbserver1.inventory.customers

使用Docker列出容器:
docker ps -a

使用bash,您需要创建一个新连接器:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'


kafka-watcher是以交互模式启动的,这样我们就可以在控制台中看到Debezium捕获的CDC日志事件。

测试
现在,如果我们使用root用户和debezium密码连接到MySQL Docker容器,我们可以发出各种SQL语句并检查kafka-watcher容器控制台输出。
插入新customer行时:

INSERT INTO <code>inventory</code>.<code>customers</code>
(
    <code>first_name</code>,
    <code>last_name</code>,
    <code>email</code>)
VALUES
(
    'Vlad',
    'Mihalcea',
    'vlad@acme.org'
)

在kafka-watcher,我们现在可以找到以下JSON条目:

{  
   "payload":{  
     
"before":null,
     
"after":{  
         
"id":1005,
         
"first_name":"Vlad",
         
"last_name":"Mihalcea",
         
"email":"vlad@acme.org"
      },
     
"source":{  
         
"name":"dbserver1",
         
"server_id":223344,
         
"ts_sec":1500369632,
         
"gtid":null,
         
"file":"mysql-bin.000003",
         
"pos":364,
         
"row":0,
         
"snapshot":null,
         
"thread":13,
         
"db":"inventory",
         
"table":"customers"
      },
     
"op":"c",
     
"ts_ms":1500369632095
   }
}

当after对象显示新插入的值时,before是null,op属性值是c,这意味着它是一个CREATE事件。

更新customer行时:

UPDATE <code>inventory</code>.<code>customers</code>
SET
    <code>email</code> = 'vlad.mihalcea@acme.org'
WHERE
    <code>id</code> = 1005


我们现在可以找到以下日志事件:


    "payload":{  
     
"before":{  
         
"id":1005,
         
"first_name":"Vlad",
         
"last_name":"Mihalcea",
         
"email":"vlad@acme.org"
      },
     
"after":{  
         
"id":1005,
         
"first_name":"Vlad",
         
"last_name":"Mihalcea",
         
"email":"vlad.mihalcea@acme.org"
      },
     
"source":{  
         
"name":"dbserver1",
         
"server_id":223344,
         
"ts_sec":1500369929,
         
"gtid":null,
         
"file":"mysql-bin.000003",
         
"pos":673,
         
"row":0,
         
"snapshot":null,
         
"thread":13,
         
"db":"inventory",
         
"table":"customers"
      },
     
"op":"u",
     
"ts_ms":1500369929464
   }
}

发出DELETE语句时:

DELETE FROM <code>inventory</code>.<code>customers</code>
WHERE id = 1005;


kafka-connectDocker容器正在记录以下事件:


    "payload":{  
     
"before":{  
         
"id":1005,
         
"first_name":"Vlad",
         
"last_name":"Mihalcea",
         
"email":"vlad.mihalcea@acme.org"
      },
     
"after":null,
     
"source":{  
         
"name":"dbserver1",
         
"server_id":223344,
         
"ts_sec":1500370394,
         
"gtid":null,
         
"file":"mysql-bin.000003",
         
"pos":1025,
         
"row":0,
         
"snapshot":null,
         
"thread":13,
         
"db":"inventory",
         
"table":"customers"
      },
     
"op":"d",
     
"ts_ms":1500370394589
   }
}

op属性值d,这意味着我们有一个删除日志事件,after对象现在是null。before对象是在它被删除之前捕获的数据库行状态。