使用 Kafka 和 Debezium 调度数百万条消息 - Yotpo


Yotpo使用Apache Kafka和Debezium为每分钟数百万条消息实施了高度可扩展且可靠的预定消息解决方案:
实现大规模分布式系统并不容易,因为传统的数据库调度无法扩展。此外,在使用微服务架构时,它变得更加困难,因为您继承了所有分布式系统问题,例如数据不一致、双重写入 和 域边界问题。
在本文中,我将分享我在 Yotpo 如何使用 Apache Kafka 和 Debezium CDC 为每分钟数百万条消息实施高度可扩展且可靠的预定消息解决方案的过程。
在接下来的几章中,我将深入探讨选择 Debezium CDC 来将消息调度到 Apache Kafka。
 
动机
作为 Yotpo 业务的一部分,我们需要向最终用户发送电子邮件。不仅如此,它可以在未来某个日期以每天数百万的规模进行,在高峰时间甚至可以在几分钟内达到数百万。因此,挑战在于安排以这种规模发送的电子邮件。
 
传统调度解决方案之旅
鉴于上述要求,我开始研究用于安排电子邮件请求的传统解决方案,并很快发现它们不符合要求。我们如何存储所有这些电子邮件,并在这种规模的确切交付日期安排它们——确保所有电子邮件都在给定的交付日期执行?让我们仔细看看旅程以及我在此过程中解决的问题。

  • RabbitMQ——延迟消息交换

起初我们以为,这正是我们所需要的!但是在阅读插件 README 后不久,我们发现它无法扩展。
此外,这个解决方案意味着我们不会在我们的系统中保存任何消息,从而导致缺乏可见性,并且无法监控和分析这些数据。
  • Quartz 作业调度器

使用 Quartz 作为调度程序和作业执行工具不会扩展到数以万计,而且看起来很复杂,尤其是当您的任务/作业很短时。
  • 数据库作为 Apache Kafka 消息的延迟消息存储

  1. 将预定消息插入数据库
  2. 当调度器定时器启动时,它需要查询数据库中未交付的条目并获取这些条目的锁,以确保这些条目只会产生一次
  3. 生产到卡夫卡
  4. 更新并提交到数据库
  5. 消费者服务消费消息并发送电子邮件

缺点
  1. 双写问题——我们试图写入两个不同的系统,即数据库和Kafka,导致数据不一致
  2. 两阶段提交不是一种选择,因为它不可扩展

 
Debezium CDC 来救援
Debezium 是一个用于变更数据捕获的开源分布式平台。启动它,将它指向您的数据库,您的应用程序就可以开始响应其他应用程序提交到您的数据库的所有插入、更新和删除操作。
利用 Debezium 将预定消息从数据库流式传输到 Kafka:
我们使用 Apache Kafka 作为我们的流媒体平台和微服务之间的通信。您可能知道,Apache Kafka 是一个高度可扩展的分布式平台,能够处理大量消息,但它没有提供延迟消息的解决方案。因此,我们必须找到一种解决方案来存储所有延迟消息,直到它们准备好进行处理(即调度)。
既然 Debezium 在 Yotpo 被广泛用作变更数据捕获的基础设施,为什么不使用它来解决普通 CDC 以外的问题呢?让我解释一下我们是如何做到的……
如果我们将消息存储在数据库中,我们只需要在这些消息准备好进行处理时将它们流式传输回 Kafka。由于这是 Debezium 所擅长的,更重要的是在大规模方面 - 我们发现它是完美的选择!
  1. HTTP 服务器将消息插入到数据库中,并带有交付时间
  2. 一个单独的应用程序将作为一个简单的 cron 调度程序,当交付时间 < NOW 并且更新准备好时更新延迟的消息- 标记为 true 以在下一个 cron 周期中被忽略
  3. Debezium 将通过 CDC 捕获那些更新的条目(仅那些特定的更新,稍后会详细介绍),并将这些消息流式传输到 Apache Kafka 以供执行消费者使用

Debezium 连接器:
{
  "name": "delayed-email-message",
 
"config": {
   
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
   
"tasks.max": "1",
   
"database.hostname": "mysql",
   
"database.port": "3306",
   
"database.user": "root",
   
"database.password": "rootpass",
   
"database.server.id": "184054",
   
"database.server.name": "dbserver1",
   
"database.whitelist": "emails",
   
"database.history.kafka.bootstrap.servers": "kafka:9093",
   
"database.history.kafka.topic": "delayed.emails.history",
   
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
   
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
   
"key.converter.schemas.enable": "false",
   
"value.converter.schemas.enable": "false",
   
"table.whitelist": "emails.delayed_messages",
   
"transforms": "Reroute, filter, unwrap",
   
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
   
"transforms.unwrap.drop.tombstones": "true",
   
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
   
"transforms.Reroute.topic.regex": "(.*)(delayed_messages)$",
   
"transforms.Reroute.topic.replacement": "email.execution",
   
"transforms.filter.type": "io.debezium.transforms.Filter",
   
"transforms.filter.language": "jsr223.groovy",
   
"transforms.filter.topic.regex": "email.execution",
   
"transforms.filter.condition": "value.op == \"u\" && value.before.is_ready == false && value.after.is_ready == true"
  }
}

连接器在delay_messages表(第 19 行)上配置,并且只会响应将is_ready=false更改为is_ready=true的更新操作。
让我们仔细看看转换部分:
  • ExtractNewRecordState — 用于消息扁平化
  • 重新路由——将默认的 Debezium Kafka 主题更改为“email.execution”
  • 过滤器 — 使用更新操作过滤消息,将其 is_ready 标志从 false 更改为 true。这可确保 Debezium 连接器仅为此特定更新操作生成消息,而忽略任何其他操作

 

执行消费者
一个简单的 Kafka 消费者,它使用来自“email.execution”主题的电子邮件消息,并简单地发送电子邮件。此外,您需要根据您的规模优化消费者配置和 Kafka 主题分区。

好处

  • 分离调度和执行之间的关注点,以提高可扩展性
  • 调度程序执行不会影响数据库性能
  • 数据一致性——没有双写问题
  • 容错
  • 灵活性
  • 在数据库中存储您需要的任何数据
  • 出于分析/管理目的查询此数据
  • 获得 Apache Kafka 的好处
  • 水平刻度
  • 消息排序
  • 批量消费以提高性能
  • 流媒体功能,以及更多……

缺点
  • 复杂性——我们有 2 个额外的基础设施需要监控和维护

结果
系统在生产中运行了一段时间,在高峰期,同时调度了50K封邮件,结果如下:
  • 调度程序查询耗时 1.25 秒
  • 大约 250 毫秒的延迟,直到 debezium 从数据库中捕获所有 50k 封电子邮件并生成到 Kafka 主题——这是从调度程序应用程序和电子邮件消费者应用程序的日志中收集的

调度非常快,根据结果,我们每分钟可以收到大约 240 万条消息!这是通过对 Debezium 和 Mysql 使用默认配置。