Trivagoj为何从Hive/SQL迁移到PySpark/Python?


Trivago是一个以拍卖为基础的市场。广告商可以通过拍卖购买指定酒店的查询结果页面中的重要位置,trivago的拍卖机制将几个因素:价格、点击率和出价本身,以确定拍卖获胜者。当您访问trivago的页面并搜索位置或关键字时,会运行实时拍卖机制来确定获胜的广告客户,即哪个广告客户成为“查看交易”按钮上的广告客户。
在trivago  或任何其他以拍卖为基础的市场上竞争需要高水平的专业知识和专门的数据科学家团队。trivago的首要任务是创造一个公平的市场; 精致和精品广告客户应该具有竞争力并为我们的用户带来价值。作为此战略愿景的一部分,trivago创建了一个专门负责竞标主题的团队,以帮助广告客户优化其在我们的市场中运行的广告系列。

机器学习
机器学习(ML)工程和软件开发一样基本上都是关于编写正确且强大的算法。在ML工程中,当使用数字的浮点表示时,我们需要确保数学正确性和避免计算中的舍入误差传播方面存在额外的问题。
因此,ML工程和软件开发一样同样面临许多挑战......以及对这些挑战提出一些解决方案。像单元测试和持续集成这样的概念很快就进入了ML的行话,成为数据科学家以及从事ML工程的数值科学家常用的工具集。在2018年,不信任来自没有单元测试的程序是不可思议的。更难以信任第三方数据库包,这些数据库包没有像其他软件开发项目那样严格测试。
在trivago,数据科学家负责开发和维护生产中的算法,模糊了数据科学家和ML工程师之间的界限。这可以转化为全面的数据科学家,可以同样讨论统计和工程概念。

点击价值
我是为trivago广告商开发和实施出价算法和策略的团队的一员。这些策略旨在提高广告支出回报率(ROAS)或收入分成等业务指标的效率。创建有效出价策略的第一步是估算trivago市场上广告客户每次点击的真实价值。我们将此估算称为每次点击价值(VPC)。
我们使用贝叶斯统计数据来估算数亿家酒店和广告商的VPC。VPC估算程序每天运行,包含前一天收集的千兆字节数据形式的证据。在VPC模型之上,我们开发了一个简单的启发式逻辑 - 称为出价层 - 来计算拍卖的最终出价。鉴于trivago是一次性拍卖,有效的竞价策略不仅包括对点击的真实估值的估计,还需要一个拍卖环境的模型,即其他参与者的估值模型以及拍卖中的项目。

Hadoop生态系统和Hive
VPC算法的第一个版本由数据管道组成,该数据管道通过使用Apache Hive读取/写入表格数据到Hadoop分布式文件系统(HDFS)来更新我们的VPC概率分布,Apache Hive是一种将SQL转换为高效MapReduce的SQL方言工作。
只要我们坚持使用简单的贝叶斯模型来计算出价层中的VPC和简单启发式,这就很有效。
随着时间的推移,很明显使用Hive并不是开发我们在sprint计划中梦寐以求的竞标系统的稳定选择。由于Hive的SQL方言缺乏表现力,与Hive合作限制了团队的创造力 - 我们在此过程中发现的其他问题改变了团队对我们想要开发算法的理想技术的看法。

问题
1.单元测试
我们尝试了不同的方法为Hive代码编写单元测试但收效甚微:大多数现有框架的单元测试必须用Java编写,其他框架则处于alpha状态。我们决定更轻松地复制生产环境,将数据和配置文件复制到另外备份环境,并在那里运行我们的新算法,而不是学习编写Java代码。
同样,这种方法运行良好一段时间,但随着时间的推移,我们发现自己编写了越来越多的临时表,其结果来自中间计算步骤以调试代码。通过此设置,查找错误所需的时间与管道中的中间表/步骤数成正比。并不是很愉快。
这种测试练习消耗了团队的时间,并没有真正捕捉到重要的错误。简而言之,我们的测试阶段更多的是安慰剂,而不是提高代码质量的有效技术。用Java编写测试对我们来说不是一个选择。
单元测试的前提是它会快速运行; 在最好的情况下,测试将在几秒钟内完成。因为我们正在讨论大量数据,所以测试我们算法中的单个更改意味着需要等待3到5个小时才能完成整个管道运行。数据工程师建议对数据进行采样以使测试阶段运行得更快,但由于算法的特定特征,采样将导致估计验证的显着增加。

2.生产时间
我们的单元测试问题让团队非常清楚在修改代码库时我们需要多么小心。有时我们不得不花费数天时间测试和验证我们的更改结果 - 即使是一行!这成为了尽可能快地按照团队需要迭代算法的主要障碍。

3.表现
Hive与处理表格数据一样好而且稳健,我们的算法很快演变成计算,需要一种编程语言,允许团队以简单而优雅的方式表达想法。
我们使用R进行行为分析,因此我们习惯于dplyr在R中表达表格数据操作的方式:

input_data %>%
   filter(country == 'Germany') %>%
   mutate(revenue = clicks * cpc) %>%
   group_by(advertiser) %>%
   summarise(
       total_booking_amount = sum(booking_amount),
       total_revenue = sum(revenue)) %>%
   mutate(revenue_share = total_revenue / total_booking_amount)

行为分析是我们的出价层启发式的基础。将我们的分析结果转换为SQL是一个令人沮丧的过程,给每个参与者留下了伤疤。
SQL和dplyr都是为了转换表格数据而设计的,但与SQL相比,使用R + dplyr开辟了新的可能性。在R上轻松轻松的操作,比如操作日期,可能会成为Hive / SQL上的噩梦和难以处理的操作。

4.速度
我们希望尽可能快地制作算法,而Hive因为速度慢而臭名昭着 - 为在Hive中运行管道获得的所有稳健性付出代价,但在trivago的团队决心突破现状的界限。
从使用MapReduce作为Hive的执行引擎迁移到Tez已经改进了很大一步,但是,SQL仍然在那里!
如果具有快速算法就可以打开不同的计算可能性,例如在白天多次运行算法以包括最近的数据,或者具有并行运行的算法的不同版本以用于A / B测试目的。

5.开发工具
现代程序员可以使用高级IDE,帮助他们在提交一行代码之前捕获错误。然而,我们正在使用普通文本编辑器(如sublime-text)编写Hive脚本,因为没有更好的工具来使用Hive开发数据管道。我们可以追求的最好方法是使用语法高亮。想象一下,将代码部署到测试环境只是为了在3小时后意识到您忘记了输出列表中的逗号。不是很好。

Spark救主
在编写数据管道的大量可能技术中,Spark成为满足团队需求的最佳选择之一:它具有数据科学领域最流行的编程语言的API,即Scala,Python,R和Java的; 文件是彻底的; 围绕技术的社区很强大; 而且,最重要的是,它是trivago中使用的Hadoop发行版的一部分,因此需要很少的努力来启动并运行以编写第一个概念证明。

R for Analyzes,Python forProd
在决定使用Spark之后,为了避开Scala或Java,我们面临许多数据科学团队迟早要做出的决定:使用R还是Python?R是一种用于临时分析和可视化的令人惊叹的语言,但它有一个很大的缺点:它与我们在trivago中的所有生产基础设施相比都很糟糕。相比之下,Python是一种相当不错的语言,它可以在不同的生产系统中无缝集成。我们决定使用Python + Spark,而且就不准备回头了。

那么PySpark如何满足团队的需求呢?

  • 单元测试:就像使用pytest一样简单。我们有两种可能的测试需求:涉及数据帧的功能,以及其他所有功能。对于第一个测试用例,我们开发了一个内部库,可以将数据帧与定义的数值精度进行比较。
  • 生产时间:依靠单元测试,我们现在对我们编写的算法的正确性更有信心。因此,我们的算法中消除了一系列编程错误,使我们能够更快地尝试新想法。
  • 表现力:由于PySpark的DataFrame API的设计,利用Python的表现力以表格格式操纵数据是很简单的。我个人对拥有一个sound库以处理日期感到欣慰。
  • 开发工具:团队决定使用PyCharm组合编写代码,Jupyter在开发阶段以交互方式运行代码。

此外,我们决定使用类型提示作为进一步减少引入错误或运行时错误的可能性的方法。使用类型提示在经验丰富的Pythonistas中是有争议的,但我们保证效忠于没有。在单元测试之后,类型提示被证明是我们采取的最佳决策之一,值得付出努力。到目前为止,这些提示正在帮助我们捕捉到我们在开发或集成测试的后期阶段偶尔会发现的错误论点。

UDF世界
定义用于处理表格数据的定制函数(通常称为用户定义函数或UDF)的可能性在Spark中真正为操纵数据框提供了新的可能性。使用UDF,时刻依靠某行数据进行推理,而不是在整个数据列上进行进行数据推理,这样就导致定义操作很简单。
例如,假设您要计算一个列,包含转化指标(定义为导致预订的点击百分比),该表包含一个名为“预订”的列和一个名为“点击”的列。在PySpark中,它看起来像这样:

import pyspark.sql.functions as pysf
result = (
    input_data
        .withColumn('conversion',
            pysf.when(pysf.col('clicks') > 0,
                      pysf.col('bookings') / pysf.col('clicks'))
                .otherwise(pysf.lit(None))
)

这相当复杂,有点难以阅读。没有使用表格数据经验的程序员将很难围绕这个计算。通过使用UDF,该算法变得更容易阅读和更健壮:

import pyspark.sql.functions as pysf
from pyspark.sql import DataFrame
from pyspark.sql.types import FloatType
def conversion_calculation(bookings: int, clicks: int) -> float:
    if clicks is None or bookings is None:
        raise ValueError('Faulty data')
    if bookings > clicks:
        raise ValueError('Bookings greater than clicks')
    if clicks == 0:
        return None
    return bookings / clicks
conversion_calculation_udf = pysf.udf(conversion_calculation, FloatType())
result = (
    input_data
        .withColumn('conversion',
            conversion_calculation_udf('bookings', 'clicks'))
)
# The rest of your code here…

除了看起来更好,使用UDF还可以更容易地为算法创建单元测试,原因有两个。首先,UDF必须是纯函数,可以说是最简单的测试,因为没有隐藏的状态需要考虑。其次,为单元测试模拟PySpark数据帧非常耗时,而为接收原始类型的函数模拟数据则相当容易。

路途上的障碍
Spark使用延迟评估计算,这意味着当转换算法应用于数据集时,Spark仅修改执行“计划”,并且该计划仅针对一小组操作(例如写入或计数)进行。这和Spark的分布式特性给我们带来了一些麻烦。

1.UDF问题
Spark 2.1中的UDF类的实现存在一个错误,它会导致算法挂起并冻结。没有错误消息。这项工作永远在继续。解决方案很简单,即尽可能地延迟UDF的创建。这不是解决方案。
这个bug的棘手部分是它在spark shell中运行代码时没有显示出来。由于我们的开发环境由一个Jupyter服务器分解Spark shell组成,我们只有在尝试在半生产环境中运行代码时才会发现此错误。

2.Joins
由于其分布式特性,在Hadoop中连接两个数据集的算法并不简单。在最坏的情况下,该算法在集群节点之间传输大量数据[ 详见此处 ]。
当连接大表和小表(几行和几列)时,可以将小表广播到所有节点,以避免数据从大表移动。Spark自动广播低于预定大小的所有表,但并不总是以最佳方式工作。基于实验,我们确定广播小表对算法的整体运行时间有巨大影响。为了最大限度地控制,我们停用了自动广播功能,而是花时间识别在join操作中扮演角色的足够小的表。然后我们手动强制广播此类表。

2.重新分区数据
本节不仅是一个障碍,还描述了一种对管道运行时性能有很大影响的优化技术。
在迁移阶段的早期,我们发现了Kay Ousterhout的[url=https://www.youtube.com/watch?v=mBk4tt7AEHU]演讲[/url]。这个讲话基本上挑战了几个深深侵入Spark社区的常见咒语。在这些咒语中,最重要的可能是认为网络数据传输对Spark管道的运行时具有显着的负面影响。Kay发现网络数据传输的影响微不足道; 相反,通过减少落后任务,即花费大量时间等待另一个操作完成的任务,可以实现最大的性能提升。

简而言之,如果群集中的大多数节点摄取大致相同数量的信息,则可以最小化拖延straggler任务。由于网络传输不应该是主要的阻止因素,我们决定重新分区,而不会在数据框架中存在大量数据。

重新划分数据不仅有助于最大限度地减少落后任务。它也有一个很好的副作用:写入HDFS的文件大小大致相同。反过来,这有助于Hadoop更有效地读/写这些文件。

下一步
从Hive到PySark的过渡在很多方面都是非常有益的。团队对Spark,Hive和Hadoop的理解显着增加。我们成为更好的程序员,甚至成为一个更好的团队。我们设法在trivago内挑战并改变对Spark的一些先入之见。但最重要的是,我们最终得到了一个我们喜欢的数据管道,用现代编程工具编写现代技术。工作还没有完成:最近我们简化了我们的持续集成管道,以自动运行单元测试,linting,类型检查和更好的自动部署。
我相信不断的重构 - 并将知识传播到其他团队的trivago中,门已经开放,我们开始考虑利用Spark的MLLib的算法。