Ifood如何使用Golang实现每天消耗超过10亿条 Kafka 消息


Ifood它是一家巴西食品科技公司,每天交付超过100 万个订单,并且每年增长约 110%。作为一家食品技术公司,该平台的流量时间主要是在午餐和晚餐时间,而且在周末会更高。
 
我们有一个微服务,用于存储 Ifoo d的客户元数据(我们内部称其为帐户元数据),并且在高峰时段每分钟达到超过 200 万个请求。该系统由网络和移动应用程序以及许多内部团队调用以获取客户数据。数据存储在单个 DynamoDB 表中(13 亿个项目使用 757.3GB)。
我们在那里存储什么样的数据?每个 Ifood 用户的特点。一些特征示例:

  • - 订单总数的计数器
  • - 前 3 名最喜欢的菜肴
  • - 最喜欢的餐厅
  • - 用户所在的细分

 
主要问题
数据团队的管道,将数据从数据导出湖使用Databricks和Airflow对其进行处理,最后将其发送到帐户的元数据,包括大量的步骤,并 能很容易失败。经常有些事情会失败,因为我们并不总是对每一件事情都有最好的监控。这意味着它不可靠。
鉴于我们的数据质量是我们关注和兴趣的核心,这对我们作为用户配置文件团队来说是一个大问题。
这总是一个巨大的痛苦,所以我们开始环顾四周,思考如何改变基础设施的那部分。这意味着,每天更改数亿条记录的摄取。
 
当我们正在寻找替代方案并试图找出如何替换整个摄取过程时,Ifood 内部的一个 ML 团队正在构建一个新的很棒的工具:功能特征存储 (FS) 项目。总之,Feature Store 是一种非常轻松地提供和共享数据以支持 ML 应用程序、模型训练和实时预测的方式。一方面,FS 从某处(数据湖、数据仓库、Kafka 主题等)读取数据,将其聚合,进行某种处理或计算,然后在另一侧(API,在某些数据库中,Kafka主题等)。
当我们听到这个消息时,很明显这正是我们所需要的:一种集中、独特且非常有组织的方式来消费来自数据湖的数据。即使我们将 FS 用于与 ML 应用程序并不真正相关的事情,这对我们来说也是一个非常合适的用例。这会让我们的事情变得非常容易:他们会以某种方式导出数据,然后我们只需要将其保存在我们的数据库中。我们会将超级复杂且非常脆弱的管道更改为强大而可靠的机制。在与 Feature Store 团队交谈后,我们决定他们将通过 Kafka 主题将功能导出给我们。
但是,FS 无法批量导出功能,这意味着对于每个 Ifood 客户(大约 6000 万)和每个功能,都会导出一条消息。当时,我们有大约 20 个功能,但已经计划将其增加到 ~ 30 或 40。这意味着每天 60mi * 20 = 12 亿条消息,但几个月后这个数字可能会增加到 1.5bi 以上。
因此,我们每天应该能够消耗大约 1.5 条 Kafka 消息。
 
使用特征存储 (FS) 中的数据
正如我所说,FS 会将数据导出到 Kafka 主题中。我们将架构定义为如下所示:


  account_id: string 
  feature_name: string 
  feature_value: string 
  namespace: string 
  timestamp: int 
}

有了这个,我们可以创建一个消费者来监听 Kafka 主题并将功能保存在 DynamoDB 表中。
在 Dynamo 表中,我们将account_id用作分区键和namespace排序键。顾名思义,命名空间将帐户元数据系统提供的数据拆分到不同的上下文中。
这就是我们的表的样子:

| account_id | namespace | columns and its values… |
| 283a385e-8822–4e6e-a694-eafe62ea3efb | orders | total_orders: 3 | total_orders_lunch: 2 |
| c26d796a-38f9–481f-87eb-283e9254530f | rewards | segmentation: A |

消费者从 Kafka 主题中读取数据并将数据保存到 DynamoDB 中。
我们第一次使用 Java 实现了我们的消费者。它表现得相当好,但与我们需要的相差甚远:每个 pod/消费者每秒消耗 4k 功能。我们尝试了一些调整和不同的配置,但离 1.5 bi 还很远。
之后,我们使用goka尝试了不同的 Go 实现,goka是与 Kafka 交互的高级 Go 库。效果更好:每个 pod/消费者每秒消耗 8.5k 个功能。然而,离​​我们需要的还很远。
最后,仍然使用 Go,但使用sarama,我们能够实现一个工作器每分钟消耗 100 万个事件(每个 pod/消费者每秒消耗 20k 个功能。)。每个 pod/consumer 创建三个 goroutine 来处理从 Kafka 收到的消息。是的,我们做到了!这是第三次尝试,所以我们学习了一些关于如何正确配置 Kafka 客户端、设置正确的数据批读取大小等的知识。
 
Kafka实需要一些调整,这样Kafka的消费者非常快。主要参数:"fetch.min.bytes”, “auto commiting” and “setting max intervals”.