大规模运行 Apache Airflow 的经验教训 - shopify

22-05-24 banq

Apache Airflow是一个编排平台,支持工作流的开发、调度和监控。在 Shopify,我们已经在生产环境中运行 Airflow 两年多,用于各种工作流程,包括数据提取、机器学习模型训练、Apache Iceberg 表维护和DBT 驱动的数据建模。在撰写本文时,我们目前在 Kubernetes 上运行 Airflow 2.2,使用 Celery 执行器和 MySQL 8。

Shopify 对 Airflow 的使用在过去两年中急剧扩大。在我们最大的环境中,我们运行了 10,000 多个 DAG,代表了各种各样的工作负载。该环境平均在给定时刻运行 400 多个任务,每天执行超过 150,000 次运行。随着 Shopify 内采用率的提高,我们的 Airflow 部署所产生的负载只会增加。由于这种快速增长,我们遇到了一些挑战,包括文件访问速度慢、对 DAG(有向无环图)能力控制不足、流量水平不规则以及工作负载之间的资源争用等。

下面我们将分享一些我们学到的经验教训和我们构建的解决方案,以便大规模运行 Airflow。

1. 使用云存储时文件访问速度可能会很慢
快速文件访问对于 Airflow 环境的性能和完整性至关重要。明确定义的文件访问策略可确保调度程序能够快速处理 DAG 文件并让您的作业保持最新。

Airflow 通过反复扫描和重新解析配置的 DAG 目录中的所有文件来保持其工作流的内部表示是最新的。必须经常扫描这些文件,以保持每个工作负载的磁盘上真实数据源与其在数据库中的表示之间的一致性。这意味着 DAG 目录的内容必须在单个环境中的所有调度程序和worker之间保持一致(Airflow 提出了一些实现这一点的方法)。

经过一些实验,我们发现我们可以通过在 Kubernetes 集群中运行 NFS(网络文件系统)服务器来极大地提高 Airflow 环境的性能。

在大规模运行 Airflow 时确保快速文件访问时要考虑的另一个因素是文件处理性能。Airflow 具有高度可配置性,并提供多种方式来调整后台文件处理(例如排序模式、 并行性超时)。这使您可以根据要求优化交互式 DAG 开发或调度程序性能的环境。

2. 增加元数据量会降低Airflow 操作
在正常规模的 Airflow 部署中,至少在连续运行的头几年内,元数据量导致的性能下降不会成为问题。

然而,在规模上,元数据开始快速积累。一段时间后,这可能会开始对数据库产生额外的负载。这在 Web UI 的加载时间中很明显,在 Airflow 升级期间更是如此,在此期间迁移可能需要数小时。

经过反复试验,我们确定了 28 天的元数据保留策略,并实现了一个简单的 DAG,它使用 PythonOperator 中的 ORM(对象-关系映射)查询从包含历史数据(DagRuns、TaskInstances、Logs)的任何表中删除行、TaskRetries 等)。我们选择了 28 天,因为这为我们提供了足够的历史记录来管理事件和跟踪历史工作绩效,同时将数据库中的数据量保持在合理的水平。

不幸的是,这意味着我们的环境不支持依赖持久作业历史的 Airflow 功能(例如,长时间运行的回填)。这对我们来说不是问题,但根据您的保留期限和 Airflow 的使用情况,它可能会导致问题。
作为自定义 DAG 的替代方法,Airflow最近增加db clean了对可用于删除旧元数据的命令的支持。此命令在 Airflow 版本 2.3 中可用。

3. DAG 很难与用户和团队联系起来
在多租户环境中(尤其是在大型组织中)运行 Airflow 时,能够将 DAG 追溯到个人或团队非常重要。为什么?因为如果一项工作失败、抛出错误或干扰其他工作负载,我们的管理员可以快速联系相应的用户。

如果所有 DAG 都直接从一个存储库部署,我们可以简单地使用它git blame来追踪作业所有者。然而,由于我们允许用户从他们自己的项目中部署工作负载(甚至在部署时动态生成作业),这变得更加困难。

为了方便追溯 DAG 的来源,我们引入了 Airflow命名空间的注册表,我们将其称为 Airflow 环境的清单文件。

清单文件是一个 YAML 文件,用户必须在其中为其 DAG 注册一个命名空间。在此文件中,它们将包含有关作业所有者和源 github 存储库(甚至源 GCS 存储桶)的信息,并为其 DAG 定义一些基本限制。我们为每个环境维护一个单独的清单,并将其与 DAG 一起上传到 GCS。

4. DAG 作者有很大的权力
通过允许用户直接编写 DAG 并将其上传到共享环境,我们赋予了他们很大的权力。由于 Airflow 是我们数据平台的核心组件,它与许多不同的系统相关联,因此工作具有广泛的访问权限。虽然我们信任我们的用户,但我们仍然希望对他们在给定的 Airflow 环境中可以做什么和不可以做什么保持一定程度的控制。这在规模上尤其重要,因为 Airflow 管理员无法在所有作业投入生产之前对其进行审查。
为了创建一些基本的防护机制,我们实现了一个DAG 策略,它从前面提到的 Airflow 清单中读取配置,并通过引发AirflowClusterPolicyViolation.
根据清单文件的内容,此策略将对 DAG 文件应用一些基本限制,例如:

  • DAG ID 必须以现有命名空间的名称为前缀,以获得所有权。
  • DAG 中的任务只能将任务排入指定的 celery 队列——稍后会详细介绍。
  • DAG 中的任务只能在指定的池中运行,以防止一个工作负载占用另一个工作负载的容量。
  • 此 DAG 中的任何 KubernetesPodOperators 只能在指定的 命名空间中启动 pod ,以防止访问其他命名空间的机密。
  • DAG 中的任务只能将 pod 启动到指定的外部 kubernetes 集群集

可以扩展此策略以强制执行其他规则(例如,仅允许有限的一组运算符),甚至改变任务以符合特定规范(例如,为 DAG 中的所有任务添加特定于命名空间的执行超时) .

5. 确保负载的一致分布是困难的
为您的 DAG 计划间隔使用绝对间隔是非常诱人的——只需将 DAG 设置为运行一次timedelta(hours=1),您就可以走开,安全地知道您的 DAG 将大约每小时运行一次。但是,这可能会导致大规模的问题。

当用户合并大量自动生成的 DAG,或者编写一个 Python 文件在解析时生成许多 DAG 时,会同时创建所有的 DAGRun。这会产生大量流量,可能会使 Airflow 调度程序以及作业正在使用的任何外部服务或基础设施(例如 Trino 集群)过载。

6.资源争用点多
Airflow 内部有很多可能的资源争用点,通过一系列实验性的配置更改很容易最终陷入瓶颈。其中一些资源冲突可以在 Airflow 中处理,而其他资源冲突可能需要一些基础架构更改。

更多点击标题

猜你喜欢