DoorDash使用 Kafka 和 Flink 构建可扩展的实时事件处理

22-08-15 banq

在 DoorDash,实时事件是深入了解我们业务的重要数据源,但构建能够处理数十亿实时事件的系统具有挑战性。事件由我们的服务和用户设备生成,需要处理并传输到不同的目的地,以帮助我们在平台上做出数据驱动的决策。举几个用例:
  • 几乎所有的事件都需要传输到我们的OLAP数据仓库进行业务分析。例如,Dasher 分配团队(Dasher 是我们对送货司机的称呼)依靠数据仓库中的分配数据来检测其分配算法中的任何错误。
  • 一些事件将由下游系统进一步处理。例如,我们的 ML 平台处理交付事件以生成实时特征,例如最近餐厅的平均等待时间。
  • 一些移动事件将与我们的时间序列指标后端集成以进行监控和警报,以便团队可以快速识别最新移动应用程序版本中的问题。例如,我们的 DoorDash 消费者应用程序中的任何结帐页面加载错误都需要发送到Chronosphere进行监控。

我们的遗留系统是如何工作的 从历史上看,DoorDash 有一些数据管道,它们从我们遗留的单体 Web 应用程序中获取数据,并将数据摄取到我们的主要数据仓库Snowflake中。每个管道的构建方式不同,只能处理一种事件,并且在数据最终进入数据仓库之前涉及多个跃点。示例如图 1 所示:



图 1:DoorDash 的旧数据管道

这种方法有几个问题:

  • 构建多个试图实现类似目的的管道是成本低效的。
  • 混合不同类型的数据传输并通过多个消息传递/排队系统而没有精心设计可观察性会导致操作困难。

这些导致高数据延迟、高成本和运营开销。 介绍 Iguazu 我们的事件处理系统 两年前,我们开始创建一个名为 Iguazu 的实时事件处理系统,以取代传统的数据管道,并随着数据量随着业务的增长而满足我们预期的以下事件处理需求:
  • 异构数据源和目的地:从各种数据源中提取数据,包括传统的单体 Web 应用程序、微服务和移动/Web 设备,并交付到不同的目的地,包括第三方数据服务。将可靠且低延迟的数据摄取到数据仓库中是重中之重。
  • 易于访问:使不同团队和服务能够轻松利用数据流并构建自己的数据处理逻辑的平台。
  • 端到端模式实施和模式演变:模式不仅提高了数据质量,而且还促进了与数据仓库和 SQL 处理的轻松集成。
  • 可扩展、容错且易于操作的小型团队:我们希望构建一个可以轻松扩展以满足业务需求且运营开销最小的系统。

为了实现这些目标,我们决定将战略从严重依赖 AWS 和第三方数据服务转变为利用可以定制并更好地与 DoorDash 基础设施集成的开源框架。Apache KafkaApache Flink等流处理平台在过去几年中已经成熟并变得易于采用。这些是我们可以用来自己构建东西的优秀构建块。

在过去的两年里,我们构建了实时事件处理系统,并将其扩展为每天处理数千亿个事件,交付率达到 99.99%。系统的整体架构如下图 2 所示。在以下部分中,我们将详细讨论系统的设计以及我们如何解决一些主要的技术挑战:
  • 使用统一的 API 和事件格式简化事件发布,以避免采用瓶颈
  • 为不同类型的数据消费者消费事件提供多种抽象
  • 在基础架构即代码环境中使用 Github 自动化和工作流程自动化入职流程



图2:Iguazu 整体系统架构图

简化和优化事件制作 构建处理系统的第一步是选择处理事件的最佳技术和方法。我们选择Apache Kafka作为我们的流数据发布/订阅系统,因为 Kafka 已被证明是统一异构数据源同时提供高吞吐量和高性能的出色解决方案。 利用和增强 Kafka Rest Proxy 我们希望每个 DoorDash 服务都能够轻松地为 Kafka 生成事件。一个明显的选择是创建一个 Kafka 客户端并将其与所有服务集成。但是,这种方法有一些缺点:

  • 每个服务都需要配置 Kafka 连接,这对于不熟悉 Kafka 的团队来说可能会出现问题并减慢采用速度
  • 跨不同服务的统一和优化的 Kafka 生产者配置将很困难
  • 对于移动和 Web 应用程序,直接连接到 Kafka 是不可行的


因此,我们决定利用Confluent 的开源 Kafka Rest Proxy进行事件生成。代理为我们提供了一个中心位置,我们可以在其中增强和优化事件生成功能,而无需与客户端应用程序协调。它通过 HTTP 接口提供对 Kafka 的抽象,消除了配置 Kafka 连接的需要,并使事件发布更加容易。由于跨不同客户端实例和应用程序的批处理能力,记录批处理对于降低代理的 CPU 利用率至关重要,它也得到了显着改进。 Kafka REST 代理提供了我们需要的所有开箱即用的基本功能,包括:
  • 支持不同类型的有效负载格式,包括 JSON 和二进制文件
  • 支持批处理,这对于减少客户端应用程序和 Kafka 代理的处理开销至关重要
  • 与Confluent 的模式注册表集成,可以使用模式验证和转换 JSON 有效负载


最重要的是,我们根据需要定制了其余代理,并添加了以下功能:
  • 生产多个 Kafka 集群的能力。生成多个集群的能力对我们来说至关重要,因为我们的事件迅速扩展到多个 Kafka 集群,并且事件到集群的映射是我们系统中的一个重要抽象。
  • 异步 Kafka 产生请求。使用异步生产无需在响应客户端请求之前先获得代理的确认。相反,响应会在有效负载经过验证并添加到 Kafka 生产者的缓冲区后立即发回。此功能大大减少了响应时间,并提高了批处理和系统的整体可用性。虽然当 Kafka 不可用时,此选项可能会导致少量数据丢失,但通过代理端生产者重试和密切监视从代理收到的确认来抵消风险。
  • 预取 Kafka 主题元数据并生成测试 Kafka 记录,作为 Kubernetes pod 就绪探测的一部分。此增强将确保代理 pod 将预热所有缓存和 Kafka 连接,以避免冷启动问题。
  • 支持 Kafka 标头作为代理生成请求负载的一部分。我们的事件有效负载格式依赖于事件信封,它是生产记录中 Kafka 标头的一部分。我们将在后面的部分中详细介绍我们的事件有效负载格式和序列化。


优化生产者配置 虽然 Kafka 的默认配置适用于需要高一致性的系统,但对于吞吐量和可用性很重要的非事务性事件发布和处理来说,它并不是最有效的。因此,我们微调了 Kafka 主题和代理生产者的配置,以实现高效率和吞吐量:
  • 我们使用 2 的复制因子和一个最小的同步副本。与三个副本的典型配置相比,这节省了磁盘空间并降低了 broker 在复制时的 CPU 使用率,同时仍然提供了足够的数据冗余。
  • 生产者配置为在分区的领导者(而不是追随者)持久化数据后立即接收来自代理的确认。这种配置减少了生产请求的响应时间。
  • 我们通过在 50 毫秒到 100 毫秒之间设置一个合理的逗留时间来利用 Kafka 的粘性分区器,这显着提高了批处理并降低了代理的 CPU 利用率。


总而言之,此调整将 Kafka 代理 CPU 利用率降低了 30% 到 40%。 在 Kubernetes 中运行 Rest 代理 事实证明,Kafka REST 代理很容易在我们自己的 Kubernetes 基础架构中构建和部署。它作为内部服务构建和部署,并利用了 DoorDash 基础设施提供的所有 CI/CD 流程。基于 CPU 利用率在服务上启用Kubernetes 水平 pod 自动缩放。这显着降低了我们的运营开销并节省了成本。



图 3:显示 Kubernetes 水平 pod 自动缩放对 Kafka Rest Proxy 生效

既然我们描述了简化和高效的事件生成,让我们在下一节中关注我们为促进事件消费所做的工作。 不同抽象的事件处理 如开头所述,伊瓜苏的一个重要目标是创建一个便于数据处理的平台。Flink 的分层 API架构完全符合这个目标。
Data Stream API 和 Flink SQL 是我们支持的两个主要抽象。 我们选择 Apache Flink 还因为它的低延迟处理、基于事件时间的处理的原生支持、容错以及与各种源和接收器的开箱即用集成,包括 Kafka、Reddis(通过三分之一-party OSS)、ElasticSearch 和 S3。
在 Kubernetes 中使用 Helm 进行部署 我们的平台提供了一个基本的 Flink docker 镜像,其中包含与 DoorDash Kubernetes 基础设施的其余部分良好集成的所有必要配置。
Flink 的高可用性设置和 Flink 指标是开箱即用的。为了更好的故障隔离和独立扩展的能力,每个 Flink 作业都以独立模式部署,作为一个单独的 Kubernetes 服务。 在使用数据流 API 开发 Flink 应用程序时,工程师首先会克隆一个 Flink 应用程序模板,然后添加自己的代码。
应用程序和 Flink 作业的配置(如并行度和任务管理器计数)将在terraform模板中定义。
在构建过程中,将使用组合的应用程序 jar 文件和我们内部的 Flink 基础 docker 镜像创建一个 docker 镜像。部署过程将同时采用 terraform 模板和应用程序 docker 映像,并根据生成的Helm Chart将应用程序部署在我们的 K8s 集群中的独立 Flink 作业集群中。该过程如下图所示:



图 4:使用 terraform 和 Helm 构建和部署 Flink 应用程序的过程

提供 SQL 抽象 虽然 Flink 的数据流 API 对于后端工程师来说并不难理解,但对于数据分析师和其他临时数据用户来说,它仍然是一个主要障碍。
对于这些用户,我们提供了一个平台来使用 SQL 以声明方式创建 Flink 应用程序,而无需担心基础设施级别的细节。该工作的详细信息可在此博客中找到。
在开发基于 SQL 的应用程序时,所有必要的处理逻辑和接线都被捕获在一个YAML文件中,该文件非常简单,每个人都可以阅读或创作。YAML 文件捕获了许多高级抽象,例如,连接到 Kafka 源并输出到不同的接收器。以下是此类 YAML 文件的示例: 要创建 Flink 作业,用户只需要使用 YAML 文件创建 PR。PR 合并后,将启动 CD 管道并将 YAML 文件编译为 Flink 应用程序并进行部署。 在以上两节中,我们介绍了在伊瓜苏发生和消费的事件。但是,如果没有统一的事件格式,生产者和消费者仍然很难相互理解。在下一节中,我们将讨论作为生产者和消费者之间协议的事件格式。

事件格式、序列化和模式验证 从一开始,我们就为在Iguazu 伊瓜苏制作和处理的事件定义了统一的格式。
统一的事件格式大大降低了消费事件的门槛,减少了事件生产者和消费者之间的摩擦。 所有事件都有一个标准的信封和有效负载。信封包含事件的上下文(例如创建时间和来源)、元数据(包括编码方法)和对模式的引用。有效负载包含事件的实际内容。
信封还包括作为 JSON blob 的非模式化事件属性。信封中的这个 JSON 部分有助于使部分事件模式变得灵活,其中的更改不涉及正式的模式演变过程。 从内部微服务产生的事件有效负载将经过模式验证和编码。无效的有效载荷将直接在生产者端丢弃。对于从移动/Web 设备产生的事件,它们采用原始 JSON 格式,我们使用单独的流处理应用程序进行模式验证和转换为模式化格式以供下游流程使用。

我们为事件生产者和消费者创建了序列化/反序列化库,以便与这种标准事件格式进行交互。在 Kafka 中,事件信封存储为 Kafka 记录头,有效负载存储为记录值。对于事件消费者,我们的库将解码 Kafka 标头和值并重新创建事件以供消费者处理。



图 5:显示从 Kafka 记录到 Event 的反序列化以及 Event 的概念如何在存储 (Kafka) 与应用程序运行时中表示

我们几乎所有的微服务都是基于GRPCProtobuf 的。所有事件都由 Protobuf 在集中共享的 Protobuf Git 存储库中定义。在 API 级别,我们对 Event 的定义是对 Protobuf 消息的包装,以使我们的微服务易于使用。

但是,对于大多数事件的最终目的地,Avro格式仍然比 Protobuf 得到更好的支持。对于这些用例,我们的序列化库负责将 Protobuf 消息无缝转换为 Avro 格式,这要归功于Avro 的 protobuf 库,并在需要时转换回 Protobuf 消息。

我们大量利用 Confluent 的模式注册表进行通用数据处理。所有事件都在模式注册表中注册。随着最近在 Confluent 模式注册表中引入的 Protobuf 模式支持,我们实现了使用 Protobuf 和 A​​vro 模式进行通用数据处理的能力。
我们一开始面临的一个挑战是我们如何强制和自动化模式注册。我们不想在生成事件时在运行时注册模式,因为:

  1. 它会在某个时间显着增加模式更新请求,从而导致模式注册表的可伸缩性问题。
  2. 任何不兼容的模式更改都会导致模式更新失败和客户端应用程序的运行时错误。

相反,最好在构建时注册和更新模式,以减少更新 API 调用量,并有机会在周期的早期捕获不兼容的模式更改。

我们创建的解决方案是将模式注册表更新集成为我们集中式 Protobuf Git 存储库的 CI/CD 流程的一部分。在拉取请求中更新 Protobuf 定义时,CI 流程将使用模式注册表验证更改。如果是不兼容的更改,CI 过程将失败。在 CI 通过并合并拉取请求后,CD 进程实际上将使用模式注册表注册/更新模式。CI/CD 自动化不仅消除了手动模式注册的开销,而且还保证:
  • 在构建时检测不兼容的架构更改,以及
  • 已发布的 Protobuf 类二进制文件与模式注册表中的模式之间的一致性。

在上面的部分中,我们通过 Iguazu 中的统一事件格式讨论了事件生成、消费和它们的绑定。在下一节中,我们将以低延迟和容错的方式描述 Iguazu 与其最重要的数据目的地 - 数据仓库的集成。

数据仓库集成 正如文章开头提到的,数据仓库集成是Iguazu 伊瓜苏的主要目标之一。
Snowflake 仍然是我们主要的数据仓库解决方案。我们希望事件以强一致性和低延迟交付给 Snowflake。 数据仓库集成是作为一个两步过程实现的。
在第一阶段,来自 Kafka 的 Flink 应用程序使用数据并以Parquet文件格式上传到 S3。此步骤有助于将摄取过程与 Snowflake 本身分离,因此任何与 Snowflake 相关的故障都不会影响流处理,并且鉴于 Kafka 的保留有限,数据可以从 S3 回填。
此外,在 S3 上拥有 Parquet 文件可以启用数据湖解决方案,我们稍后通过内部Trino安装对此进行了探索。 上传数据到 S3 的实现是通过 Flink 的StreamingFileSink完成的。
在作为 Flink 检查点的一部分完成 Parquet 文件的上传时,StreamingFileSink 保证了强一致性和一次性交付。StreamingFileSink 还允许在 S3 上进行自定义分桶,我们利用它在 S3 目录级别对数据进行分区。这种优化大大减少了下游批处理器的数据加载时间。

在第二阶段,数据通过Snowpipe从 S3 复制到 Snowflake 。由 SQS 消息触发,Snowpipe 可以在 S3 文件可用时立即加载数据。Snowpipe 还允许在复制过程中进行简单的数据转换。鉴于其声明性,与流处理相比,它是一个很好的选择。 一个重要的注意事项是,每个事件都有自己的用于 S3 上传的流处理应用程序和自己的 Snowpipe。因此,我们可以为每个事件单独扩展管道并隔离故障。 到目前为止,我们介绍了数据是如何从客户端到数据仓库的端到端流动的。在下一节中,我们将讨论伊瓜苏的运营方面,并了解我们如何使其自助服务以减轻运营负担。

努力打造自助平台 如上所述,为了实现故障隔离,Iguazu 中的每个事件都有自己的从 Flink 作业到 Snowpipe 的管道。然而,这需要更多的基础设施设置,并使操作成为挑战。 一开始,将新赛事加入Iguazu 伊瓜苏是一项需要大量支持的任务。DoorDash 严重依赖于基础设施即代码原则,大部分资源创建,从 Kafka 主题到服务配置,都涉及到不同terraform存储库的拉取请求。这使得自动化和创建高级抽象成为一项具有挑战性的任务。请参阅下图,了解加入新活动所涉及的步骤。



图 6:编排复杂步骤以创建从服务到 Snowflake 的管道的自动化到位

为了解决这个问题,我们与我们的基础架构团队合作,建立了正确的拉取审批流程,并使用 Git 自动化来自动化拉取请求。本质上,创建了一个 Github 应用程序,我们可以在其中以编程方式创建和合并来自我们作为控制器的服务之一的拉取请求。我们还利用了Cadence 工作流引擎,并将该流程实现为可靠的工作流。整个自动化将事件启动时间从几天缩短到几分钟。 为了使其更接近自助服务,我们使用Retool框架创建了 UI,供用户探索模式并从那里载入事件。用户可以使用正则表达式搜索架构,为事件选择正确的架构,启动工作流以创建必要的资源和服务,并有机会自定义 Snowflake 表和 Snowpipe。

我们 Iguazu 的最终目标是使其成为一个自助服务平台,用户可以在其中通过正确的抽象和最少的支持或人工干预自行参与活动。



图 7:显示模式探索 UI,用户可以在其中深入了解主题和版本。



图 8:显示用于查看/自定义 Snowflake 表架构和 Snowpipe 的 UI

学习和未来的工作 我们发现创建具有平台思维方式的事件处理系统很重要。不同技术相互叠加的临时解决方案不仅效率低下,而且难以扩展和运营。
选择正确的框架并创建正确的构建块对于确保成功至关重要。Apache Kafka、Apache Flink、Confluent Rest Proxy 和 Schema 注册表被证明具有可扩展性和可靠性。研究和利用这些框架的最佳点显着减少了开发和操作这个大规模事件处理系统所需的时间。

为了使系统用户友好且易于采用,需要正确的抽象。从声明式 SQL 方法、事件的无缝序列化到高级用户引导 UI,我们努力使其成为一个简单的过程,以便我们的用户可以专注于他们自己的业务逻辑,而不是我们的实现细节。

我们还想在Iguazu 伊瓜苏实现很多目标。我们已经开始了在Iguazu 伊瓜苏之上构建客户数据平台的旅程,我们可以轻松地以自助方式转换和分析用户数据。会话化是我们想要解决的一个重要用例。

为了能够会话化大量数据,我们开始通过利用 Kubernetes StatefulSet、持久化卷和探索使用Spinnaker的新部署方式来增强对 Flink 状态处理的支持。与数据湖更好地集成是我们前进的另一个方向。结合 SQL 抽象和正确的表格式,从流处理应用程序直接访问数据湖可以使用同一应用程序进行回填和重放,并提供另一种替代方案Lambda 架构