Apache Airflow 给我留下了深刻的印象。引擎快如闪电,编写管道真的很容易。
另一个很棒的功能是它与源代码控制同步。这样,我就知道在环境中执行了哪些内容。
一个好的提示:拥有从主分支读取的暂存环境和从发布分支读取的生产环境很有用。
什么是Apache Airflow
Apache Airflow 是一个构建数据和 ML 管道的工具。这并不完全正确。您可以构建运行任务的工作流。每个任务可以是一个 shell 脚本、python 程序或其他一些执行逻辑的任务。
您可以使用 Apache Airflow 进行数据处理,但它本身并不是一个数据处理工具。相反,它确保任务按照您在一台机器或多台机器(如有必要)上指定的顺序执行。它使用一种称为非对称有向图 (DAG) 的原理。它是任务的依赖关系图。
Apache Airflow 是一个强大的工具,因为您可以很好地了解工作流中执行的任务。它足够灵活,可以在单个工作流程中处理多种类型的任务。
您不能真正在任务之间流动数据。如果您在下一个任务中需要数据,则需要将数据存储在某处。但是,Apache Airflow 可以流动元数据,因此您可以传递数据可用的位置。
现在您已经了解了 Apache Airflow 可以做什么,让我们来看看在 Kubernetes 上部署它。
在 Kubernetes 上设置 Airflow
您可以使用 Python 在本地运行 Apache Airflow。这非常适合测试工作流程。但是,您将需要更多的计算能力来用于生产场景。
要在生产环境中运行 Apache Airflow,您需要设置多个虚拟机或使用 Kubernetes 之类的工具来托管这些工具。
出于本文的目的,我将使用 Kubernetes 作为托管平台。
Apache Airflow 提供了一个 helm 图表来在 Kubernetes 上部署所需的组件。
Helm 是 Kubernetes 的包管理器。它允许您在 Kubernetes 上安装各种应用程序,而无需手动安装数十或数百个清单。
如果你以前没有使用过 Helm,你可以在他们的网站上找到一个很棒的入门指南 。
在使用 Helm 安装 Airflow 之前,您需要告诉 Helm Apache Airflow Helm 存储库。使用以下命令配置正确的 Helm 存储库:
helm repo add apache-airflow https://airflow.apache.org |
配置 Helm 存储库后,您可以在终端中使用另一个命令安装 Airflow 组件:
helm upgrade airflow apache-airflow/airflow --namespace airflow |
此命令将使用一组默认配置设置安装 Airflow。这对开发来说很好,但我们需要更多的生产。
让 Airflow 为生产做好准备
当您不指定任何设置时,您会发现密码和密钥相当弱。因此,您需要在 Airflow 安装中添加一些配置。
让我们创建一个名为的新文件values-secrets.yml并向其中添加以下设置:
webserverSecretKey: <random-string> |
我们在这个文件中配置了一组密钥:
- 首先,我们配置webserverSecretKey使会话使用唯一签名进行签名。这可以防止用户在 Airflow 的不同安装之间转移会话。
- 接下来,我们为默认用户配置密码admin。你会想在这里设置一些强大的东西。
- 然后,我们配置数据库连接的用户名和密码。Airflow 的舵图配置了数据库服务器和气流服务器。Helm 会自动为数据库服务器和气流服务器设置正确的密码,以便它可以连接。
设置机密后,您可以使用以下命令更新 Airflow 安装:
helm upgrade airflow apache-airflow/airflow --namespace airflow \ |
以前,我们只指定安装的名称和安装哪个包。现在我们添加了一组值以使用我们存储在 values-secrets.yml文件中的值。
在 Kubernetes 中安装 Airflow 后,您可以使用以下命令将端口转发到 Web 服务器来访问它:
kubectl port-forward -n airflow svc/airflow-webserver 8080:8080 |
打开网络浏览器并使用您存储在机密文件中的凭据登录。
如果一切按预期进行,您现在应该会看到 Airflow 的用户界面。让我们配置它以从存储库中加载一些 DAG。
将 GIT 存储库连接到 Airflow
Airflow 将 DAG 存储在一个名为dags. 您需要将 DAG 文件上传到此文件夹或通过其他方式同步它们。
我们很幸运,因为 Airflow 可以选择将 DAG 文件从 GIT 同步到正确的位置,以便 Airflow 运行它们。
我们需要创建另一个名为的文件values-sync.yml并向其中添加以下内容:
dags: |
您需要配置 GIT 存储库的 URL。我在示例代码中使用了 Github 存储库。
接下来,您必须告诉 Airflow DAG 文件在存储库中的位置。我已将它们存储在子文件夹中,但您可以将它们存储在根文件夹中。如果要将文件存储在根文件夹中,则需要清除subPath.
如果您使用的是私有存储库,则需要将 SSH 公钥部署到要链接的 Github 存储库,并将私钥存储在gitSshKey 设置中。确保将私钥编码为 Base64,否则同步过程无法读取它。
配置 GIT 同步的设置后,您可以使用以下命令更新安装:
helm upgrade airflow apache-airflow/airflow --namespace airflow \ |
安装完成更新后,您可以将您的第一个 DAG 提交到源代码管理。让我们快速浏览一下构建示例 DAG 以确保完整性。
为 Apache Airflow 构建 DAG
如果您已经在 Apache Airflow 中编写 DAG,那么接下来的部分就很无聊了。但是,如果您以前没有看过 DAG,我认为最好看一下 DAG。
Apache Airflow 有两种使用 DAG 构建工作流的方法。您可以完全使用在操作员上运行的任务来构建它。您还可以使用任务流 API。
我将展示 Taskflow API,因为它是上手最快的 API。
例如,我创建了一个非常基本的 DAG,它不会做任何具体的事情。在这里向您展示 DAG 的基本原理:
from airflow.decorators import dag, task |
我们需要两件事来构建 DAG。首先,我们需要创建一个标记为的函数@dag(),它将构建任务流。每个任务都是一个标有 的函数@task()。
我们将 设置schedule_interval为None所以 DAG 不会自动执行。您可以将间隔设置为每天、每小时或自定义设置。
开始日期告诉 Airflow 应该何时首次安排 DAG。我们已将此设置为过去的日期。但是您可以将其设置为未来的日期。
我们有三个任务,提取、转换和加载。这些任务必须按顺序执行。我们可以通过将每个任务转发到下一个来告诉运行时。
请注意,我们在这里调用任务,但不会执行它们。它只是告诉调度程序何时执行哪个任务。
最后,我们调用 DAG 函数来告诉运行时我们有一个新的 DAG 要加载。
它看起来像普通的 Python,但事实并非如此。每个函数都打包到一个 Kubernetes pod 中,并在 Airflow 调度 DAG 时执行。
当您将新 DAG 提交到链接存储库时,它将在您推送更改后一分钟可用。
对代码感兴趣?你可以在这里得到它并自己尝试一下