DoorDash如何使用 Apache Kafka 和 Elasticsearch 构建更快的索引?


保持愉快的在线订购体验包括确保大型搜索索引在规模上保持有效。对于 DoorDash 来说,这是一个特别的挑战,因为商店、商品和其他数据的数量每天都在增加。在这种负载下,重新索引所有更改并更新我们的搜索数据库可能需要长达一周的时间。 
我们需要一种快速的方法来索引我们平台的所有可搜索数据,以改进产品发现,确保我们为消费者提供所有可用的订购选项。此外,该项目还将提高我们平台上的实验速度,因此我们可以更快地提高搜索性能。 
我们的解决方案涉及构建一个新的搜索索引平台,该平台在我们的数据源上使用增量索引。我们基于三个开源项目构建了这个平台,Apache KafkaApache FlinkElasticsearch
 
DoorDash 的搜索索引问题 
我们的旧索引系统既不可靠也不可扩展,而且速度很慢。可靠的索引系统将确保商店和商品的变化实时反映在搜索索引中。增量索引有助于更快地刷新数据,构建新索引以在更短的时间内引入新的分析器和附加字段,最终有助于改进检索。
DoorDash 中来自新业务垂直领域的团队希望建立自己的搜索体验,但在索引搜索数据时不想重新发明轮子。因此,我们需要一个即插即用的解决方案来改善新的搜索体验,同时又不减慢这些垂直业务团队的开发速度。 
 
为索引文档构建事件驱动的管道 
我们通过构建一个新的搜索索引平台来解决这些问题,该平台提供快速可靠的索引以支持不同的垂直行业,同时还提高了搜索性能和搜索团队的生产力。它使用 Kafka 作为消息队列和数据存储,使用 Flink 进行数据转换并将数据发送到 Elasticsearch。

上面的图 1 显示了我们搜索索引管道中的各种组件。这些组件分为四个部分:

  • 数据源:这些是对数据进行CRUD 操作的系统。我们称它们为数据的真实来源。在我们的堆栈中,我们使用Postgres作为数据库,使用Snowflake作为数据仓库。 
  • 数据目的地:这是针对搜索进行了优化的数据存储。在我们的例子中,我们选择了 Elasticsearch。
  • Flink 应用程序:我们在索引管道中添加了两个自定义 Flink 应用程序,用于转换数据的汇编器和用于将数据发送到目标存储的接收器。组装人员负责组装 Elasticsearch 文档中所需的所有数据。接收器负责根据模式塑造文档并将数据写入目标 Elasticsearch 集群。
  • 消息队列:我们使用 Kafka 作为我们的消息队列技术。上面图 1 中的 Kafka 2 组件使用压缩并无限期保留的日志主题。

这些组件结合在一起构成了端到端的数据管道。使用 Kafka 将数据源中的数据更改传播到 Flink 应用程序。Flink 应用程序实现业务逻辑来管理搜索文档并将其写入目的地。现在我们了解了高级组件,让我们来看看不同的索引用例。

 
索引更改数据捕获 (CDC) 事件 
DoorDash 的商家数据不断创建和更新,需要通过我们的索引管道解决方案来解决。例如,这些更新可以是从商家运营商向商店添加标签到更新菜单的任何内容。我们需要尽快将这些变化反映在消费者体验上,否则消费者将在应用程序中看到陈旧的数据。这些对平台的更新保存在数据存储中,例如PostgresApache Cassandra。迭代工作流还以每天的节奏处理数据仓库中的数据,为商业智能应用程序等提供动力。
为了从服务的数据库中可靠地捕获这些更新事件,我们探索了使用Debezium 连接器Aurora /Postgres启用变更数据捕获(#CDC),Debezium 连接器是一个红帽开发的用于捕获行级更改的开源项目。存储团队进行的初始性能测试表明,这种策略开销太大,性能不佳,尤其是当服务使用相同的数据库来服务在线流量时。因此,我们在应用程序中实现了保存钩子,它负责处理数据更新请求,每当底层数据存储发生更改时,通过 Kafka 传播更改事件。我们称这种方法为应用级 CDC。
使用应用程序级 CDC,我们可能会遇到一致性问题。一个分布式应用程序有多个实例。可以通过两个不同的实例提供两个单独的更新调用。如果我们在 Kafka 消息中包含更新的值,则无法保证一致性并解决问题,因为在某些情况下,应用程序的多个实例会推送更新相同值的事件。 
例如,如果应用程序实例 #1 发送一个事件 ,{store_id: 10, is_active=true}而应用程序实例 #2 发送一个事件{store_id: 10, is_active=false},则在消费者端会发生冲突。
为确保一致性,我们仅在 Kafka 事件中发送更改的实体 ID。收到 Kafka 事件后,我们的 Assembler 应用程序调用应用程序上的REST API 以收集有关 Kafka 事件中存在的实体的其他信息。REST API 调用可确保有关实体的数据一致性。汇编器合并信息以创建一个事件,该事件将其推送到 Kafka 以供 Sink 应用程序使用。汇编器实现了窗口化重复数据删除,可防止在指定时间内多次调用同一实体的 REST API。汇编器还进行事件聚合,以便批量调用 REST 端点。例如,在 10 秒内,它会汇总商店的商品更新。它为该存储调用 REST API,包括所有重复数据删除和聚合的项目。
总而言之,我们使用应用程序级 CDC 来捕获数据更改事件。我们通过简化的事件和 REST API 解决一致性问题。我们使用重复数据删除和窗口函数来优化事件处理。 
 
索引 ETL 数据
商店和商品文档的许多属性对我们的检索过程至关重要,例如机器学习模型生成的分数和标签,每天批量更新一次。这些数据要么是模型生成的,就像机器学习模型运行最新的数据一样,要么是人工策划的,比如我们的操作员手动为特定商店用“鸡”标记商品。在每晚运行相应的 ETL 作业后,这些数据会填充到我们数据仓库中的表中。 
在使用新的搜索索引平台之前,我们没有将数据上传到索引的可靠方法,而是使用缓慢且不精确的解决方法。我们希望通过为我们的新搜索索引平台提供在 24 小时内将 ETL 数据可靠地摄取到我们的索引中的机制来改进我们现有的管道。 
ETL 用例的 CDC 模式与上一节中描述的增量更新案例非常不同。在增量更新的情况下,商家数据存储不断更新,导致一天中不断更新。另一方面,对于 ETL 用例,当 ETL 运行时,更新会同时发生,在下一次运行之前没有其他更新。
我们决定不对 ETL 源使用应用程序级 CDC 的变体,因为每次 ETL 运行时我们都会看到更新的大高峰,而这个高峰可能会给我们的系统带来过度压力并降低性能。相反,我们想要一种机制来在一段时间内分散 ETL 摄取,这样系统就不会不堪重负。
作为前进的方向,我们开发了一个自定义 Flink 源函数,该函数定期将 ETL 表中的所有行分批流式传输到 Kafka,其中选择批量大小以确保下游系统不会不堪重负。 
 
将文档发送到 Elasticsearch
一旦 Assembler 应用程序将数据发布到目标主题,我们就会有一个消费者读取hydrated水合消息,根据特定的索引模式转换消息,并将它们发送到相应的索引。此过程需要管理架构、索引和集群。我们为每个 ElasticSearch 索引维护一个唯一的 Kafka 消费者组,以便消费者可以维护每个索引的偏移量。为了转换消息,我们使用了一个 DocumentProcessor(s),它从目标主题接收一个水合事件并输出准备好被索引的格式化文档。 
Sink 进程利用Flink Elasticsearch Connector将 JSON 文档写入 Elasticsearch。开箱即用,它具有速率限制和节流功能,这对于在系统处于高写入负载下时保护 Elasticsearch 集群至关重要。该流程还支持批量索引,我们在一个时间窗口内收集所有文档和相关操作并执行批量请求。索引文档的任何失败都会导致文档被记录并存储在死信队列中,该队列可以在以后处理。
 
快速回填新索引
通常,我们可能希望向索引添加新属性,例如将与商店或商品关联的市场 ID 添加到文档中,因为它有助于我们进行分片。同样,我们可能需要快速重新创建一个新索引,例如当我们想尝试不同的索引结构来运行效率基准时。 
在遗留系统中,我们依赖于一个缓慢且不可靠的工作,通常需要一个月的时间来重新索引所有商店和项目文档。鉴于索引持续时间长,很难正确估计与重新索引过程相关的错误率。因此,我们从未确定索引质量。我们经常收到关于索引和真实来源之间的商店详细信息不匹配的投诉,必须手动修复。
使用我们的新搜索索引平台,我们希望有一个流程能够在 24 小时内快速重新创建新索引或回填现有索引中的属性。对于引导过程,我们需要一种机制来快速重新创建所有需要在 Elasticsearch 中建立索引的文档。这个过程包括两个步骤: 

  1. 流式传输与需要在 ElasticSearch 中索引的文档对应的所有实体 ID 
  2. 在将实体 ID 发送到下游进行索引之前,通过进行外部调用将实体 ID 映射到其最终形式。 

将实体 ID 映射到实体的最终形式的管道已经建立,作为我们在上面提到的在线组装器工作的一部分。因此,所需要的只是流式传输所有需要在 Elasticsearch 中建立索引的文档 ID。因此,我们维护需要在数据仓库的引导表中索引的所有实体 ID 的最新副本。当我们需要引导时,我们使用 ETL 部分中描述的源函数将这些引导表中的所有行流式传输到 Kafka。我们封装了在单个作业中执行上述两个步骤的逻辑。
如果我们在引导管道的同时运行增量索引管道,我们就有在 Elasticsearch 中获取过时数据的风险。为了避免这些问题,我们在每次引导程序运行时缩减增量索引器,并在引导程序完成后将其重新扩展。
综上所述,我们回填和重新创建索引的步骤如下:
  • 创建索引并根据需要更新其属性,并更新组装器和接收器中的业务逻辑和配置以填充新属性。
  • 缩小在线汇编程序。 
  • 扩大引导作业。
  • 引导程序完成后,缩减引导程序作业并扩展在线汇编程序。一旦偏移变为最近,引导过程就完成了。

 
启用强制重新索引功能 
有时,我们在 Elasticsearch 中的一些文档可能有过时的数据,这可能是因为上游的某些事件没有交付,或者我们的一项下游服务响应时间过长。在这种情况下,我们可以强制重新索引任何有问题的文档。 
为完成此任务,我们发送一条消息,其中包含要索引到在线组装器从中获取数据的主题中的实体的 ID。一旦消息被消费,我们上面描述的索引管道就会启动,每个文档都会在 Elasticsearch 中重新索引。
我们用唯一的标签对一次性索引任务中发送的消息进行注释,这为我们提供了文档在通过索引流的各个阶段时的详细跟踪。除了向我们提供文档确实被索引的保证之外,它还为我们提供了丰富的调试信息,帮助我们验证并帮助发现任何可能阻止它首先被索引的错误。
 

结果
我们新的搜索索引平台更可靠。增量索引速度有助于更快地刷新数据,并在我们的消费者应用程序中更迅速地出现。更快的重新索引使我们能够在短时间内构建新索引以改进我们的检索: 

  • 将回填我们整个商店目录的时间从一周缩短到 6.5 小时
  • 将回填我们整个项目目录的时间从两周减少到 6.5 小时
  • 将平台上现有商店和商品重新编制索引的时间从 1 周减少到 2 小时