无服务器管道:将MySQL事件流传输到Knative Services - zhaw


我们最初的想法是利用数据库触发机制并编写可以监听这些事件并将它们发布到Kafka总线的组件。实际上,我们开始编写针对PostgreSQL的代码来实现这一目标,但后来我们遇到了Debezium项目,该项目基本上解决了同样的问题,虽然不是在相同的上下文中,但是具有更成熟的代码库和对多个数据库系统的支持。重新发明轮子是没有意义的,因此目标变成了如何最好地整合Debezium和Knative。

经过一些阅读和实验,我们最终得到了图1所示的解决方案。可以看出,它包含一个连接到Kafka Connect服务的MySQL数据库; 这使用Debezium MySQL插件Kafka总线上产生消息; Knative的Apache Kafka事件源连接到其清单中指定的Kafka总线上的特定主题; 最后,Event Display打印出它从Apache Kafka事件源接收的消息。
Kafka Connect--一个将外部系统连接到Kafka集群的框架 - 通过插件架构支持各种插件,其中一个是Debezium。Debezium实际上包含多个插件,每个插件支持一个数据库。在我们的例子中,我们使用了MySQL插件,它通过标准端口与MySQL数据库通信,并在数据库的二进制日志(binlog)上运行。binlog本质上是数据库上发生的所有操作的有序记录。Debezium连接器筛选binlog,根据需要进行过滤,并从binlog文件中的条目生成事件,然后将其传输到Kafka主题,为每个数据库表创建一个新主题。
最后,为了使用新创建的主题中的消息,我们利用Knative的Apache Kafka事件源,它提供了Knative和Kafka之间的集成,并且可以使用来自一个或多个Kafka主题的消息。然后,它将消息转发给Knative服务(标准FaaS术语中的函数),该服务可以对数据库更改起作用。在我们的例子中,我们有一个简单的事件显示服务,它只是将收到的事件转储到输出。

部署MySQL
为简单起见,我们将Docker镜像用于Debezium提供的预先配置的MySQL服务器(可在此处找到)并将其作为我们的Kubernetes集群中的pod部署在新的命名空间中。配置更改并不是那么重要 - 必须启用binlog支持并确保可以远程访问数据库。有关所需配置的更多信息,请在此处详细说明。请注意,Debezium并未真正对数据库托管位置施加约束; 例如,您可以使用自我管理或云管理的MySQL实例,它可以同时与多个数据库一起运行。Debezium还支持监控其他数据库系统,如MongoDBPostgreSQLOracleSQL Server。

部署Kafka和Kafka Connect集群
下一步是使用Debezium MySQL插件部署Apache Kafka集群和Kafka Connect集群,以便使用MySQL服务器中的事件。我们使用了Strimzi,它提供了一种简单的方法来启动和运行Kafka和Kafka Connect。具体来说,我们按照0.11.2版本的说明在同一名称空间中部署Kafka和Kafka Connect。
Kafka提供了一个配置设置,用于控制在生产者尝试写入不存在的主题时是否可以自动创建主题。在标准的Strimzi Kafka配置中,默认情况下禁用此功能。这意味着Kafka Connect无法创建主题,因此无法监控我们的数据库实例。为了解决这个问题,我们不得不在Kafka Strimzi清单中设置auto.create.topics.enable选项true。有关在此处向Kafka添加配置参数的更多信息
在部署Kafka Connect时,我们使用了包含Debezium MySQL连接器的定制Docker镜像

Kafka Connect和Debezium​​​​​​​
如上所述,Kafka Connect使用用于MySQL的Debezium连接器读取MySQL数据库的二进制日志 - 这将按照数据库提交的相同顺序记录所有操作,包括更改表的模式或更改存储在其中的数据表格。MySQL连接器为binlog中的每个行级INSERT,DELETE和UPDATE操作生成更改事件。数据库中每个表的更改事件将发布到单独的Kafka主题,另外一个主题用于记录​​对数据库模式的更改。
Kafka Connect公开了一个RESTful API,您可以通过它管理要监视的数据库列表。要监视新数据库,将/connectors使用JSON格式的必要配置对Kafka Connect的资源发出POST请求。这里解释 JSON配置。
此时,我们已准备好告诉Debezium监控我们的MySQL数据库。我们向Kafka Connect REST API发送了一个POST请求,其中包含有关数据库的信息,并获得一个HTTP 201响应,指示请求已成功接受。
为了测试配置,我们监视Kafka Connect日志以查看它是从我们的数据库实例接收数据,我们查看了Kafka Connect创建的Kafka主题,以便在我们手动执行一些SQL INSERT,UPDATE和DELETE时看到正在生成消息查询。

使用Kafka事件源为Knative
随着Kafka Connect / Debezium和Kafka的成立,我们准备将它连接到Knative。在撰写本文时,我们使用了最新版本的Knative - v0.5,并按照这些说明进行安装。
为了使用Debezium创建的Kafka主题中的消息,我们使用了Knative的Apache Kafka事件源,它提供了Knative和Apache Kafka之间的集成。Kafka事件源是一个Knative资源,可以订阅一个或多个Kafka主题并开始接收消息。然后,它可以开始将收到的消息中继到Knative 服务通道 - 在Knative术语中称为接收器。可以在此处查看Kafka事件源的示例清单文件。
最后,为了对收到的消息做一些事情,我们创建了Event DisplayKnative服务,打印出它从Kafka事件源接收的消息。通过这种方式,我们能够对MySQL数据库进行更改,从而在Knative上触发了服务。​​​​​​​
值得注意的是,此解决方案有点过于简单,仅用于演示目的。Knative支持更复杂的事件管道,可能涉及通过通道连接在一起的多个服务,应该在更真实的世界环境中使用。更多关于Knative事件的信息