优步大数据平台处理100多PB数据却只有分分钟延迟!


优步致力于在全球市场上提供更安全,更可靠的运输服务。为实现这一目标,优步在很大程度上依赖于在各个层面制定数据驱动的决策,从预测高流量事件期间的需求到识别到解决我们的驾驶员 - 合作伙伴注册流程中的瓶颈。随着时间的推移,更多要求洞察力的分析需求导致超过100PB的分析数据,这些都需要通过基于Hadoop的大数据平台以最小的延迟进行清洗、存储和服务。自2014年以来,我们一直致力于开发大数据解决方案,确保数据可靠性,可扩展性和易用性,并且现在专注于提高我们平台的速度和效率。  

在本文中,我们将深入探讨Uber的Hadoop平台之旅,并讨论我们接下来要构建的内容,以扩展这个丰富而复杂的生态系统。

第1代:优步的大数据开始
在2014年之前,我们有限的数据量可以适用于一些传统的在线事务处理(OLTP)数据库(在我们的例子中,MySQL和PostgreSQL)。为了利用这些数据,我们的工程师必须单独访问每个数据库或表,如果用户需要组合来自不同数据库的数据,则需要编写自己的代码。那时,我们没有全局访问或全部查看我们存储的所有数据。实际上,我们的数据分散在不同的OLTP数据库中,总数据大小大约为几TB,访问这些数据的延迟非常快(通常是亚分钟)。


2014年之前,Uber存储的数据总量足够小,可以容纳一些传统的OLTP数据库。没有数据的全局视图,数据访问速度很快。

随着优步的业务呈指数级增长(无论是在我们运营的城市/国家数量以及每个城市使用该服务的乘客/司机数量),传入数据量也增加,并且需要访问和分析所有一个地方的数据要求我们建立第一代分析数据仓库。为了使Uber尽可能地受数据驱动,我们需要确保分析师可以在一个地方访问分析数据。为实现这一目标,我们首先将数据用户分为三大类:

1. 城市运营团队(数千名用户):这些实地工作人员管理和扩展优步在每个市场的运输网络。随着我们的业务扩展到新的城市,有成千上万的城市运营团队定期访问这些数据,以响应特定于驾驶员和车手的问题。
2. 数据科学家和分析师(数百名用户):这些分析师和科学家分布在不同的功能组中,这些功能组需要数据来帮助我们为用户提供最佳的运输和交付体验,例如,在预测司机需求以便面向未来时服务。
3.工程团队(数百名用户):整个公司的工程师专注于构建自动数据应用程序,例如我们的欺诈检测和驾驶员入门平台。

我们的第一代分析数据仓库专注于在一个地方聚合优步的所有数据以及简化数据访问。对于前者,我们决定使用Vertica作为我们的数据仓库软件,因为它具有快速,可扩展和面向列的设计。我们还开发了多个临时ETL(提取,转换和加载)作业,这些作业将来自不同来源(即AWS S3,OLTP数据库,服务日志等)的数据复制到Vertica中。为了实现后者,我们将SQL标准化为我们的解决方案接口,并构建了一个在线查询服务来接受用户查询并将它们提交给底层查询引擎。

第一代优步大数据平台允许我们在一个地方聚合所有优步数据,并为用户提供标准SQL界面来访问数据。

我们的第一个数据仓库服务的发布对整个公司的工程师来说都是一个巨大的成功。用户第一次拥有全局视图,可以在一个位置访问所有数据。这导致大量新团队使用数据分析作为其技术和产品决策的基础。在几个月内,我们的分析数据量增长到数十TB,用户数量增加到数百。

使用SQL作为简单的标准接口,使城市运营商能够轻松地与数据交互,而无需了解底层技术。此外,不同的工程团队开始构建针对用户需求量身定制的服务和产品,这些服务和产品由这些数据(即uberPool,前期定价等)提供信息,并组建了新的团队以更好地使用和提供这些数据(即我们的机器学习)和实验团队)。

限制
另一方面,我们的数据仓库和传入数据的广泛使用揭示了一些限制。由于数据是通过临时ETL作业摄取的,而我们缺乏正式的模式通信机制,因此数据可靠性成为一个问题。我们的大多数源数据都是JSON格式,并且摄取作业对生产者代码的更改没有弹性。

随着公司的发展,扩展数据仓库变得越来越昂贵。为了降低成本,我们开始删除旧的,过时的数据,以释放新数据的空间。除此之外,我们的大部分大数据平台都不能横向扩展,因为主要目标是解除对集中数据访问或查看的关键业务需求,并且没有足够的时间来确保所有部分都是水平可扩展的。我们的数据仓库实际上被用作数据湖,堆积所有原始数据以及执行所有数据建模和服务数据。

此外,由于生成数据的服务与下游数据消费者之间缺乏正式合同,将数据摄入数据仓库的ETL作业也非常脆弱(使用灵活的JSON格式导致缺乏架构执行来源数据)。如果不同的用户在摄取期间执行不同的转换,则可以多次摄取相同的数据。这对我们的上游数据源(即在线数据存储)造成了额外的压力,并影响了他们的服务质量。此外,这导致我们的仓库中存储了几个相同数据的副本,进一步增加了存储成本。在数据质量问题的情况下,回填非常耗费时间和劳力,因为ETL作业是临时的和源依赖的,并且在摄取期间进行数据预测和转换。由于我们的摄取工作缺乏标准化,因此很难摄取任何新的数据集和类型。

 

第2代:Hadoop的到来
为了解决这些限制,我们围绕Hadoop生态系统重新构建了我们的大数据平台。更具体地说,我们引入了一个Hadoop数据湖,其中所有原始数据仅从不同的在线数据存储中提取一次,并且在摄取期间没有转换。这种设计转变显着降低了我们在线数据存储的压力,使我们能够从临时摄取作业转变为可扩展的摄取平台。为了让用户能够访问Hadoop中的数据,我们引入了Presto来实现交互式临时用户查询,Apache Spark促进了对原始数据(以SQL和非SQL格式)的编程访问,以及Apache Hive作为极大查询的主力。这些不同的查询引擎允许用户使用最能满足其需求的工具,使我们的平台更加灵活和易于访问。

为了保持平台的可扩展性,我们确保所有数据建模和转换仅在Hadoop中进行,从而在出现问题时实现快速回填和恢复。只有最关键的建模表(即,城市运营商实时利用这些表来运行纯粹,快速的SQL查询)才被转移到我们的数据仓库。这大大降低了运行庞大数据仓库的运营成本,同时还将用户引导到基于Hadoop的查询引擎,这些查询引擎的设计考虑了他们的特定需求。

我们还利用了Apache Parquet的标准列式文件格式,考虑到提供分析查询的列式访问,提高了压缩率和计算资源收益,从而节省了存储空间。此外,Parquet与Apache Spark的无缝集成使该解决方案成为访问Hadoop数据的流行选择。下面的图3总结了我们的第二代大数据平台的架构:

结合Parquet,Spark和Hive等技术,可以摄取,存储和提供数十PB的数据。
除了整合Hadoop数据湖之外,我们还使该生态系统中的所有数据服务都可以横向扩展,从而提高了我们的大数据平台的效率和稳定性。特别是,具有这种通用的水平可扩展性以满足直接的业务需求,使我们能够集中精力构建下一代数据平台,而不是临时解决问题。

与第一代数据管道易受上游数据格式更改影响的平台不同,我们的第二次迭代允许我们对所有数据进行模式化,从JSON转换为Parquet以将模式和数据存储在一起。为此,我们构建了一个集中模式服务来收集,存储和提供模式以及不同的客户端库,以便将不同的服务集成到此中央模式服务中。脆弱的临时数据摄取作业被替换为标准平台,以将其原始嵌套格式的所有源数据传输到Hadoop数据湖中。通过Hadoop中的水平可伸缩批处理作业摄取后,对数据进行了任何必需的操作和转换。

随着优步的业务继续以轻快的速度扩展,我们很快就拥有了数十亿的数据。每天都有数十TB的新数据被添加到我们的数据湖中,我们的大数据平台增长到超过10,000个vcores,在任何一天都有超过100,000个批处理作业。这导致我们的Hadoop数据湖成为所有分析Uber数据的集中真实来源。

限制
随着公司不断扩展并在我们的生态系统中存储了数百PB的数据,我们面临着一系列新的挑战。

首先,存储在我们的HDFS中的大量小文件(由于摄取更多数据以及更多用户编写生成更多输出数据的临时批处理作业)开始在HDFS NameNode上增加额外的压力。最重要的是,数据延迟仍然远远超出了我们的业务需求。用户只能每24小时访问一次新数据,这对于做出实时决策来说太慢了。将ETL和建模移动到Hadoop使得这个过程更具可扩展性,这些步骤仍然是瓶颈,因为这些ETL作业必须在每次运行中重新创建整个建模表。除此之外,新数据的摄取和相关派生表的建模都基于创建整个数据集的新快照并交换旧表和新表以向用户提供对新数据的访问。摄取作业必须返回源数据存储区,创建新快照,并在每次运行期间将整个数据集提取或转换为可消耗的柱状Parquet文件。随着我们的数据存储量的增长,这些工作可能需要超过20个小时才能运行超过1,000个Spark执行程序。

每项工作的很大一部分涉及从最新快照转换历史数据和新数据。虽然每个表每天只添加超过100千兆字节的新数据,但每次运行的摄取作业都必须转换该特定表的整个超过100 TB的数据集。对于在每次运行中重新创建新派生表的ETL和建模作业也是如此。这些作业必须依赖基于快照的源数据摄取,因为历史数据的更新比例很高。从本质上讲,我们的数据包含许多更新操作(即骑手和驾驶员评级或支持票价调整,在完成旅行后的几个小时甚至几天)。由于HDFS和Parquet不支持数据更新,因此从更新的源数据创建新快照所需的所有提取作业,将新快照摄取到Hadoop中,将其转换为Parquet格式,然后交换输出表以查看新数据。

虽然Hadoop在我们的大数据平台中启用了数PB的数据存储,但新数据的延迟仍然超过一天,由于基于快照的大型上游源表需要几个小时到达而导致延迟处理。
 

第3代:长期重建我们的大数据平台
到2017年初,我们的大数据平台被整个公司的工程和运营团队使用,使他们能够在一个地方访问新的和历史数据。用户可以通过根据他们的需求量身定制的单一UI门户轻松访问Hive,Presto,Spark,Vertica,Notebook和更多仓库选项中的数据。我们的计算集群中有超过100 PB的数据,每天100,000个Presto查询,每天10,000个Spark作业,每天20,000个Hive查询,我们的Hadoop分析架构遇到了可扩展性限制,许多服务受到了很高的影响数据延迟。

幸运的是,由于我们的底层基础架构可以横向扩展以满足当前的业务需求,因此我们有足够的时间研究数据内容,数据访问模式和用户特定需求,以便在构建下一代之前确定最紧迫的问题。我们的研究揭示了四个主要的痛点:

HDFS可扩展性限制:许多依赖HDFS扩展其大数据基础架构的公司都面临着这个问题。根据设计,HDFS受其NameNode容量的瓶颈,因此存储大量小文件会显着影响性能。当数据大小超过10 PB并且成为超过50-100 PB的真正问题时,通常会发生此限制。

幸运的是,有一些相对简单的解决方案可以将HDFS从几十个扩展到几百个PB,例如利用ViewFS和使用HDFS NameNode Federation。通过控制小文件的数量并将数据的不同部分移动到单独的集群(例如,HBase和Yarn应用程序日志移动到单独的HDFS集群中),我们能够减轻此HDFS限制。

Hadoop中更快的数据:Uber的业务实时运营,因此,我们的服务需要访问尽可能新鲜的数据。因此,对于许多用例而言,24小时数据延迟太慢了,并且对更快的数据传输存在巨大需求。我们的第二代大数据平台基于快照的摄取方法效率低下,阻止我们以较低的延迟摄取数据。为了加快数据交付速度,我们不得不重新设计管道,以便仅增量提取更新数据和新数据。

支持Hadoop和Parquet中的更新和删除:优步的数据包含很多更新,包括过去几天的年龄(例如,骑车或驾驶员 - 合作伙伴调整最近的旅行票价)到几周(例如,骑车人在下次他们的最后一次旅行时评级新旅行)甚至几个月(例如,由于业务需要回填或调整过去的数据)。通过基于快照的数据提取,我们每24小时提取一次源数据的新副本。换句话说,我们一次摄取所有更新,每天一次。但是,由于需要更新的数据和增量摄取,我们的解决方案必须能够支持现有数据的更新和删除操作。但是,由于我们的大数据存储在HDFS和Parquet中,因此无法直接支持对现有数据的更新操作。另一方面,我们的数据包含非常宽的表(大约1,每个表有000个列,具有五个或更多级别的嵌套,而用户查询通常只触及其中一些列,从而阻止我们以经济高效的方式使用非柱状格式。为了准备我们的大数据平台以实现长期增长,我们必须找到一种方法来解决HDFS文件系统中的这种限制,以便我们也可以支持更新/删除操作。

更快的ETL和建模:与原始数据摄取类似,ETL和建模作业基于快照,要求我们的平台在每次运行中重建派生表。为减少建模表的数据延迟,ETL作业也需要增量。这要求ETL作业逐步从原始源表中提取已更改的数据,并更新先前派生的输出表,而不是每隔几小时重建整个输出表。

介绍Hudi
考虑到上述要求,我们构建了Hadoop Upserts和增量(Hudi),这是一个开源Spark库,在HDFS和Parquet之上提供抽象层,以支持所需的更新和删除操作。Hudi可以在任何Spark作业中使用,可以横向扩展,并且只依赖于HDFS来运行。因此,任何需要支持历史数据更新/删除操作的大数据平台都可以利用Hudi。

Hudi使我们能够在Hadoop中更新,插入和删除现有的Parquet数据。此外,Hudi允许数据用户逐步提取仅更改的数据,显着提高查询效率并允许派生建模表的增量更新。

我们的Hadoop生态系统中的原始数据是根据时间划分的,任何旧分区都可能在以后接收更新。因此,对于依赖于这些原始源数据表的数据用户或ETL作业,了解哪个日期分区包含更新数据的唯一方法是扫描整个源表并根据一些已知的时间概念过滤掉记录。这导致计算成本高昂的查询,需要完整的源表扫描并防止ETL作业非常频繁地运行。

使用Hudi,用户可以简单地传递他们的最后一个检查点时间戳并检索所有已更新的记录,无论这些更新是否是添加到最近日期分区的新记录或更新旧数据(例如,今天发生的新旅行与从6个月前开始的更新行程),无需运行扫描整个源表的昂贵查询。

使用Hudi库,我们能够从基于快照的原始数据提取转移到增量提取模型,使我们能够将数据延迟从24小时减少到不到一小时。下面的图5描述了Hudi成立后我们的大数据平台:  

我们的大数据平台的第三代采用了更快的增量数据提取(使用我们的开源Marmaray框架),以及通过我们的开源Hudi库更高效地存储和提供数据。

通用数据摄取
Hudi并不是第三代大数据平台的唯一补充。我们还通过Apache Kafka正式确定了存储和大数据团队之间上游数据存储区更改的移交。上游数据存储事件(以及来自不同应用程序和服务的经典日志消息)使用统一的Avro编码流入Kafka,包括附加的标准全局元数据头(即时间戳,行键,版本,数据中心信息和发起主机)。Streaming和Big Data团队都使用这些存储更改日志事件作为其源输入数据以进行进一步处理。

我们的数据提取平台Marmaray以小批量运行并从Kafka获取上游存储更改日志,使用Hudi在Hadoop上的现有数据之上应用它们库包。如前所述,Hudi支持upsert操作,允许用户添加新记录并更新或删除历史数据。摄取Spark作业每10-15分钟运行一次,在Hadoop中提供30分钟的原始数据延迟(具有1-2个摄取作业失败或重试的余量)。为避免因多次将相同的源数据提取到Hadoop而导致效率低下,我们的设置不允许在原始数据摄取期间进行任何转换,从而导致我们决定使原始数据提取框架成为EL平台,而不是传统的ETL平台。在此模型下,鼓励用户在上游数据以其原始嵌套格式登陆后,在Hadoop内以批处理模式执行所需的转换操作。

自从对我们的大数据平台实施这些更改以来,我们通过避免不必要或低效的提取操作来节省了大量的计算资源。我们的原始数据的可靠性也得到了显着提高,因为我们现在可以避免在摄取过程中容易出错的转换。现在,用户可以使用任何大数据处理引擎在原始源数据之上运行转换。此外,如果出现任何问题,用户可以再次重新运行其转换,并通过使用更多计算资源和更高程度的并行性来更快地完成批转换作业,从而满足其SLA。  

增量数据建模
考虑到需要摄入Hadoop的大量上游数据存储(截至2017年超过3,000个原始Hadoop表),我们还构建了一个通用的摄取平台,以便以统一和可配置的方式将原始数据提取到Hadoop中。现在,我们的大数据平台以递增的方式更新原始Hadoop表,数据延迟为10-15分钟,允许快速访问源数据。但是,为了确保模型化表也具有低延迟,我们必须避免在建模ETL作业中的低效率(即完整派生表重新创建或完整源原始表扫描)。实际上,Hudi允许ETL作业仅从源表中获取已更改的数据。

在ETL作业期间使用Hudi编写器使我们能够更新派生建模表中的旧分区,而无需重新创建整个分区或表。因此,我们的建模ETL作业使用Hudi读取器逐步从源表中获取已更改的数据,并使用Hudi编写器逐步更新派生的输出表。现在,ETL作业也在不到30分钟内完成,为Hadoop中的所有派生表提供不到一小时的端到端延迟。

为了向Hadoop表的数据用户提供访问所有数据或仅访问新数据或更新数据的不同选项,使用Hudi存储格式的Hadoop原始表提供了两种不同的读取模式:

1. 最新模式视图。在该时间点提供整个Hadoop表的整体视图。此视图包括所有记录的最新合并值以及表中的所有现有记录。

2. 增量模式视图。仅根据给定时间戳从特定Hadoop表中获取新记录和更新记录。此视图仅返回自最近检查点以来最近插入或已更新的行。此外,如果特定行自上一个检查点以来被多次更新,则此模式将返回所有这些中间更改的值(而不是仅返回最新的合并行)

下面的图6描述了以Hudi文件格式存储的所有Hadoop表的这两个读取视图:

通过Hudi writer更新的原始表可以以两种不同的模式读取:最新模式视图返回所有记录的最新值,增量模式视图仅返回自上次读取后更新的记录。

用户通常根据需要在这两个表视图之间切换。当他们运行即席查询以基于最新状态分析数据时,他们使用该表的最新模式视图(例如,获取美国每个城市的每周总旅行次数)。另一方面,当用户具有自其最近执行以来仅需要获取已更改或新记录的迭代作业或查询时,他们使用增量模式视图。两个视图始终可用于所有Hadoop表,用户可以根据需要在不同模式之间切换。

标准化数据模型
除了提供同一个表的不同视图外,我们还标准化了我们的数据模型,为所有原始Hadoop数据提供了两种类型的表:

更改日志历史记录表。包含为特定上游表收到的所有更改日志的历史记录。此表使用户能够扫描给定表的更改历史记录,并且可以按键合并以提供每行的最新值。
合并快照表。容纳上游表的最新合并视图。此表包含每个密钥接收的所有历史更改日志的压缩合并视图。

下面的图7描述了如何使用给定更改日志流为特定上游源数据存储生成不同的Hive原始表:

标准化我们的Hive数据模型改善了整个大数据生态系统的数据质量。此模型包含一个合并的快照表,其中包含每个row_key的最新值以及包含每个row_key的所有值更改历史记录的changelog历史记录表。

但是,更改日志流可能包含也可能不包含给定键的整个行(所有列)。虽然合并的快照表始终提供特定键的所有列,但如果更改日志的上游流仅提供部分行更改日志,则更改日志历史表可能是稀疏的,这一功能通过避免仅在一个或几个时重新发送整行来提高效率有限的列值已更改。如果用户希望从更改日志历史记录表中获取更改的值并将其与合并的快照表连接以创建完整的数据行,我们还会在更改日志历史记录表中的合并快照表中包含相同密钥的日期分区。

下面的图8总结了我们的大数据平台的不同组件之间的关系:

图8:构建更具可扩展性的数据传输平台使我们能够在一种服务下以标准方式轻松聚合所有数据流水线,并支持任何数据源和数据接收器之间的任意连接。
 

第4代:下一步是什么?
自2017年推出第三代大数据平台以来,整个公司的用户可以快速可靠地访问Hadoop中的数据,但总有增长空间。下面我们总结一下我们正在努力增强优步的大数据平台,以提高数据质量,数据延迟,效率,可扩展性和可靠性。

1. 数据质量
为了提高数据质量,我们确定了两个需要改进的关键领域。首先,当某些上游数据存储在存储之前没有强制执行或检查数据模式时(例如,存储值为JSON blob的键值),我们希望避免非符合模式的数据。这导致不良数据进入我们的Hadoop生态系统,从而影响所有依赖此数据的下游用户。为了防止不良数据的涌入,我们正在将所有上游数据存储转换为对数据内容执行强制性模式检查,并在数据存在任何问题(例如,未使用模式确认)时拒绝数据条目。

我们发现问题的第二个方面是实际数据内容的质量。虽然使用模式确保数据包含正确的数据类型,但它们不检查实际数据值(例如,整数而不是[0,150]之间的正数)。为了提高数据质量,我们正在扩展我们的架构服务以支持语义检查。这些语义检查(换句话说,Uber特定的数据类型)允许我们在基本结构类型检查之外的实际数据内容上添加额外约束。

2. 数据延迟
我们的目标是将Hadoop中的原始数据延迟减少到五分钟,将建模表的数据延迟减少到十分钟。这将允许更多用例从流处理转向使用Hudi增量数据拉取的更有效的小批量处理。

我们还在扩展我们的Hudi项目,以支持其他视图模式,其中包括现有的读取优化视图,以及显示延迟仅几分钟的数据的新实时视图。这个实时视图依赖于我们称之为Merge-On-Read或Hudi 2.0的开源解决方案(以及Hudi的一部分)。

3. 数据效率
为了提高数据效率,我们正在不再依赖专用硬件来实现任何服务和服务Docker化。此外,我们统一了Hadoop生态系统内部和跨Hadoop生态系统的所有资源调度程序,以弥合整个公司的Hadoop和非数据服务之间的差距。这允许所有作业和服务以统一的方式进行调度,而不管它将被执行的媒介。随着Uber的发展,数据位置将成为Hadoop应用程序的一大关注点,成功的统一资源管理器可以将所有现有的调度程序集中在一起(即Yarn,Mesos和Myriad)。

4.可扩展性和可靠性
作为我们改善平台可扩展性和可靠性的努力的一部分,我们确定了与可能的边缘情况相关的几个问题。虽然我们的摄取平台是作为通用的可插拔模型开发的,但实际摄取上游数据仍然包括许多依赖于源的管道配置,使得摄取管道易碎并且增加了操作数千个这些管道的维护成本。

为了确保我们无论数据来源如何都能统一数据摄取,我们与Uber的数据存储团队合作启动了一个项目,以统一来自所有上游数据源的更改日志的内容,格式和元数据,而不管其技术构成如何。该项目将确保有关这些特定上游技术的信息只是添加到实际更改日志值的附加元数据(而不是具有完全不同的更改日志内容和不同数据源的元数据),并且无论上游源是什么,都将发生数据提取。

最后,我们的下一个版本的Hudi将允许我们在几分钟内为所有数据源生成更大的Parquet文件(超过一千兆字节,相比于我们当前的128兆字节)。它还将消除围绕更新与插入的比率的任何敏感性。Hudi 1.0依赖于一种名为copy-on-write的技术,只要有更新的记录,它就会重写整个源Parquet文件。这显着增加了写入放大,特别是当更新与插入的比率增加时,并且阻止在HDF中创建更大的Parquet文件。Hudi的新版本旨在通过将更新的记录存储在单独的增量文件中并基于给定策略将其与基础Parquet文件异步合并(例如,当有足够数量的更新数据来分摊重写大基数的成本时)来克服此限制实木复合地文件)。将Hadoop数据存储在较大的Parquet文件中以及更可靠的源独立数据摄取平台将使我们的分析数据平台在未来几年随着业务的蓬勃发展而继续增长。


向前进
优步的数据组织是数据平台,数据基础,流媒体和实时平台以及大数据团队之间的跨职能协作,旨在构建支持优步分析数据基础架构的所需库和分布式服务。