Apache Airflow的10条最佳实践


最初,Airflow 有点像“超级 cron”,因此运行作业的方式与框架本身高度耦合。今天,您必须克服的最大挑战是消除调度和作业之间的耦合。

1)Airflow是一个编排框架,而不是一个执行框架:
对于您的工作,您应该只使用在单独的处理解决方案中触发 Airflow 计算的运算符,例如:

  • 一个容器:K8S、GKE、EKS、ECS、Cloud RUN……
  • 无服务器函数:AWS Lambda、云函数
  • Spark工作:EMR、DATAPROC …
  • 一个查询:BigQuery、Redshift……

在 Airflow 中直接运行您的作业(执行大量 CPU、内存或 IO 操作)会给您的 airflow_workers 带来压力。Airflow 应该只运行操作员启动和断言在单独的处理解决方案中运行的任务。

2) 避免使用 PythonOperator 进行工作:
如果您使用 PythonOperator 则只运行非常非常简单的代码,它必须只执行简单的 IO 操作(如转换小型 XCOM),否则使用规则 1 的运算符运行您的作业。
不要使用 VirtualVenvOperator 或 BashOperator 或 DockerOperator(除非您在 Airflow 工作人员的另一台主机上触发运行),您应该使用 KubernetesPodOperator 、 EmrCreateJobFlowOperator 等操作员……

3) 明智地选择运算符:

3.1) 在创建之前检查现有的运算符
在 99% 的情况下,您不应该创建新的 Airflow 运算符,请仔细查看现有运算符。检查您尝试执行的操作是否可以与现有运算符的组合(例如,第一个任务是 SubmitXXOperator,第二个任务是 SensorXXOperator)
如果你真的觉得需要一个新的运算符,请在 GitHub 或 slack 上询问 Airflow 社区,在大多数情况下,他们会向你展示现有运算符的方法。
如果您绝对需要自定义现有的运算符,扩展现有的类,不要从零开始重新创建所有内容。
在我目前的公司中,我们只创建了 2 个运算符

3.2) 不要使用每个现有的运算符
一些开源Airflow 运算符/算子只是帮手(直接运行python代码而不触发操作到外部解决方案)
例如SimpleHttpOperator或GCSToGoogleDriveOperator直接使用 python 代码运行操作。
如果您不使用 KubernetesExecutor 或 CeleryKubernetesExecutor,那么此代码将在Airflow 工作者内部运行,在这种情况下,最好使用 KubernetesPodOperator(运行容器执行所需的操作)

4)不要在你的Airflow部署中安装任何自定义的依赖。
唯一允许的依赖是Airflow社区支持的供应商apache-airflow-providers-XXX

如:

- apache-airflow-providers-cncf-kubernetes
- apache-airflow-providers-google
- apache-airflow-providers-slack

因为只有这些包是Airflow社区确保与Airflow本身良好的兼容性。
安装任何其他的依赖将使部署处于危险状态,并可能导致在升级时出现依赖地狱。

我不建议安装不在官方列表中的自定义Airflow提供商。
- https://pypi.org/project/airflow-dbt/
- https://github.com/great-expectations/airflow-provider-great-expectations

5)Airflow不是一个数据线的解决方案。
Airflow是一个运行在操作者中定义的任务的调度器,目前Airflow确实具有非常有限的(测试版)行进能力。这些功能允许Airflow与使用Open Lineage标准的第三方解决方案(如Marquez)集成。
无论多么诱人,你都应该绝对避免重新实现一个自定义/自制的解决方案。

6) Airflow不是一个数据存储解决方案。
Airflow操作者可以返回数据,Airflow将把这些数据存储在其内部数据库airflow_db中(由传统的RDBS支持,如Postgresql)。我们称存储在airflow_db中的数据为XCOM。

但是,airflow_db不应该存储自定义数据,而只是存储非常小的元数据(比如我们的BigQueryToXCOMOperator通常会返回一个单一的值,如时间戳)。

最好的做法是不从操作符中返回数据。如果你有一个特殊的情况,操作者必须返回数据(像一个复杂的json的多行),你需要使用一个自定义的xcom_backend来写入数据,而不是在airflow_db,而是在另一个地方(如S3或GCS)。

你的自定义xcom_backend必须只用于你明确选择的任务(请注意,默认情况下,Airflow会对所有的XCOM使用一个xcom_backend)。

在所有情况下,如果你用外部处理方案(如KubernetesPodOperator)运行一个作业,那么该作业就有责任将结果写入存储(如S3、GCS),其路径取决于调度器提供的上下文。

7) 不要把秘密放在Airflow的变量或连接中。
Airflow可以存储DAG可以通过模板访问的变量,因此DAG在运行时总是会检索到该变量的最新版本,对于连接也是如此。

但是,如果一个变量或连接存储了一个秘密值(如私钥),不要直接在Airflow中注册这个变量或连接,你必须在一个密钥存储(Vault、GCP秘密管理器...)中注册该变量或连接,并在Airflow中使用一个秘密后台。

你的自定义secret_backend必须只检索你定义的变量或连接(请注意,默认情况下,Airflow会首先尝试为自定义secret_backend中的所有变量和连接找到秘密,并对正常的Airflow变量或连接发出警告日志)。

8)你的作业必须在最大程度上与调度无关。
如果你重新运行一个作业,它必须是empotent的,你也必须能够在Airflow之外运行你的作业。

例如:如果你有一个抓取API并将结果写入S3的小作业,你必须能够在你电脑上的容器中运行该作业,因为该作业从env_vars或args中获取所有的上下文。这确保了与气流和实际计算的完全解耦(调度器和任务实现之间有明确的界限)。

一个好的做法是,在一个独立的仓库中编码你的作业,而不是你的dags(因为dags只需要知道实现作业的容器的名称和版本)。

然后在作业库中:为每个作业关联一个小的docker-compose,以便能够在一个固定的上下文中运行该作业。

9) 不要将Airflow的不同元素部署在一起。
如果你用官方的HELM文件在K8S中部署Airflow,这是默认行为。

但是,如果你选择手动部署,那么你必须隔离airflow_scheduler、airflow_webserver和airflow_workers,这样你就可以根据你的用例来扩展它们。

10)LocalExecutor和KubernetesExecutor并不容易。
LocalExecutor只能垂直扩展,因为你将在与调度器相同的位置运行任务(有多个调度器的HA并不意味着你应该有N个调度器来水平扩展你的Airflow堆栈)。因此,对于生产环境来说,不建议使用这种方法。

KubernetesExecutor将直接使用Kubernetes,这意味着你将直接给K8S集群带来压力(每个任务都有一个新的pod),使用该执行器更难检测和解决扩展问题。

CeleryExecutor比KubernetesExecutor更容易理解和操作,而且用该执行器可以消除Airflow和K8S之间的耦合。