Apache Airflow十条最佳实践


Apache Airflow项目有点像“超级 cron”,因此运行作业的方式与框架本身高度耦合。今天,您必须克服的最大挑战仍然是调度和作业之间的耦合。
您可以仅根据要运行的 dag 和任务的数量来扩展您的 Airflow 部署
 
1)Airflow是一个编排框架,而不是一个执行框架:
对于您的工作,您应该只使用在单独的处理解决方案中触发 Airflow 计算的运算符,例如:

  • 一个容器:K8S、GKE、EKS、ECS、Cloud RUN……
  • 无服务器函数:AWS Lambda、云函数
  • spark worker:EMR、DATAPROC …
  • 一个查询:BigQuery、Redshift……

在 Airflow 中直接运行您的作业(执行大量 CPU、内存或 IO 操作)会给您的 airflow_workers 带来压力。Airflow 应该只运行操作员启动和断言在单独的处理解决方案中运行的任务。
 
2) 避免使用 PythonOperator 进行工作:
如果您使用 PythonOperator 则只运行非常非常简单的代码,它必须只执行简单的 IO 操作(如转换小型 XCOM),否则使用规则 1 的运算符运行您的作业。
不要使用 VirtualVenvOperator 或 BashOperator 或 DockerOperator(除非您在 Airflow 工作人员的另一台主机上触发运行),您应该使用 KubernetesPodOperator 、 EmrCreateJobFlowOperator 之类的运算符……
 

3)在创建之前检查现有的operators:
在 99% 的情况下,您不应该创建新的 Airflow operator,请仔细查看现有operator。检查您尝试执行的操作是否可以与现有operator的组合(例如,第一个任务是 SubmitXXOperator,第二个任务是 SensorXXOperator)
如果你真的觉得需要一个新的operator,请在 GitHub 或 slack 上询问 Airflow 社区,在大多数情况下,他们会向你展示现有operator的方法。
如果您绝对需要自定义现有的运算符,扩展现有的类,不要从零开始重新创建所有内容。
在我目前的公司中,我们只创建了 2 个operator
 
4) 不要在您的 Airflow 部署中安装任何自定义依赖项:
唯一允许的依赖项是 Airflow 社区支持的提供程序apache-airflow-providers-XXX
喜欢 :
- apache-airflow-providers-cncf-kubernetes- apache-airflow-providers-google- apache-airflow-providers-slack
因为这些是 Airflow 社区确保与 Airflow 本身具有良好兼容性的唯一软件包。安装任何其他依赖项将使部署处于危险状态,并可能在升级时导致依赖地狱。
我不建议安装不属于官方列表的自定义 Airflow 提供程序:

  • - https://pypi.org/project/airflow-dbt/
  • - https://github.com/great-expectations/airflow-provider-great-expectations

  

5)Airflow不是一个数据lineage的解决方案。
Airflow是一个运行在operator中定义的任务的调度器,目前Airflow确实有非常有限的(测试版)lineage功能。这些功能允许Airflow与使用Open Lineage标准的第三方解决方案(如Marquez)集成。
你应该绝对避免重新实现一个自定义/自制的解决方案,无论多么诱人。
 
6)Airflow不是一个数据存储解决方案。
Airflow operator可以返回数据,Airflow将把这些数据存储在其内部数据库airflow_db中(由传统的RDBS支持,如Postgresql)。我们称存储在airflow_db中的数据为XCOM。
但是,airflow_db不应该存储自定义数据,而只应该存储非常小的元数据(比如我们的BigQueryToXCOMOperator通常会返回一个单一的值,如时间戳)。
最好的做法是不从operator那里返回数据。如果你有一个特殊的情况,operator必须返回数据(像一个复杂的json的多行),你需要使用一个自定义的xcom_backend来写入数据,而不是在airflow_db,而是在另一个地方(如S3或GCS)。
你的自定义xcom_backend必须只用于你明确选择的任务(注意,默认情况下,一个xcom_backend将被Airflow用于所有XCOM)。
在所有情况下,如果你用外部处理方案(如KubernetesPodOperator)运行一个作业,那么该作业负责将结果写入存储(如S3、GCS),其路径取决于调度器提供的上下文。
 
7)不要把安全密钥放在Airflow的变量或连接中。
Airflow可以存储DAG可以通过模板访问的变量,因此DAG在运行时总是会检索到该变量的最新版本,对于连接也是如此。
但是,如果一个变量或连接存储了一个密钥(如私钥),不要直接在Airflow中注册这个变量或连接,你必须在一个密钥存储(Vault、GCP秘密管理器...)中注册该变量或连接,并在Airflow中使用一个密钥后台。
你的自定义secret_backend必须只检索你定义的变量或连接(注意,默认情况下,Airflow会首先尝试为自定义secret_backend中的所有变量和连接找到秘密,并对正常的Airflow变量或连接发出警告日志)。
  
8)你的作业必须最大限度地不受调度的影响。
如果你重新运行一个作业,它必须是empotent的,而且你还必须有能力在Airflow之外运行你的作业。
例如:如果你有一个抓取API并将结果写入S3的小作业,你必须能够在你电脑上的容器中运行该作业,因为该作业从env_vars或args中获取所有的上下文。这确保了实际计算与Airflow的完全解耦(调度器和任务实现之间的界限清晰)。
一个好的做法是,在一个独立的仓库中编码你的作业,而不是你的dags(因为dags只需要知道实现作业的容器的名称和版本)。
然后在作业库中:为每个作业关联一个小的docker-compose,以便能够在一个固定的环境中运行该作业。
 
9) 不要把Airflow的不同元素部署在一起。
如果你在K8S中用官方的HELM文件部署Airflow,这是默认行为。
但是,如果你选择手动部署,那么你必须隔离airflow_scheduler、airflow_webserver和airflow_workers,这样你就可以根据你的用例来扩展它们。
 
10) LocalExecutor和KubernetesExecutor并不容易。
LocalExecutor只具有纵向扩展性,因为你将在与调度器相同的位置运行任务(拥有多个调度器的HA并不意味着你应该拥有N个调度器来横向扩展你的Airflow堆栈)。因此,对于生产环境来说,不建议使用这种方法。
KubernetesExecutor将直接使用Kubernetes,这意味着你将把压力直接放在K8S集群上(每个任务都有一个新的pod),使用该执行器更难检测和解决扩展的问题。
CeleryExecutor比KubernetesExecutor更容易理解和操作,而且用该执行器可以消除Airflow和K8S之间的耦合。