如何使用 Kubernetes 和 GIT 部署 Airflow ?


Apache Airflow 给我留下了深刻的印象。引擎快如闪电,编写管道真的很容易。
另一个很棒的功能是它与源代码控制同步。这样,我就知道在环境中执行了哪些内容。
一个好的提示:拥有从主分支读取的暂存环境和从发布分支读取的生产环境很有用。

什么是Apache Airflow
Apache Airflow 是一个构建数据和 ML 管道的工具。这并不完全正确。您可以构建运行任务的工作流。每个任务可以是一个 shell 脚本、python 程序或其他一些执行逻辑的任务。
您可以使用 Apache Airflow 进行数据处理,但它本身并不是一个数据处理工具。相反,它确保任务按照您在一台机器或多台机器(如有必要)上指定的顺序执行。它使用一种称为非对称有向图 (DAG) 的原理。它是任务的依赖关系图。
Apache Airflow 是一个强大的工具,因为您可以很好地了解工作流中执行的任务。它足够灵活,可以在单个工作流程中处理多种类型的任务。
您不能真正在任务之间流动数据。如果您在下一个任务中需要数据,则需要将数据存储在某处。但是,Apache Airflow 可以流动元数据,因此您可以传递数据可用的位置。
现在您已经了解了 Apache Airflow 可以做什么,让我们来看看在 Kubernetes 上部署它。

在 Kubernetes 上设置 Airflow
您可以使用 Python 在本地运行 Apache Airflow。这非常适合测试工作流程。但是,您将需要更多的计算能力来用于生产场景。

要在生产环境中运行 Apache Airflow,您需要设置多个虚拟机或使用 Kubernetes 之类的工具来托管这些工具。

出于本文的目的,我将使用 Kubernetes 作为托管平台。

Apache Airflow 提供了一个 helm 图表来在 Kubernetes 上部署所需的组件。

Helm 是 Kubernetes 的包管理器。它允许您在 Kubernetes 上安装各种应用程序,而无需手动安装数十或数百个清单。

如果你以前没有使用过 Helm,你可以在他们的网站上找到一个很棒的入门指南 。

在使用 Helm 安装 Airflow 之前,您需要告诉 Helm Apache Airflow Helm 存储库。使用以下命令配置正确的 Helm 存储库:

helm repo add apache-airflow https://airflow.apache.org
helm repo update

配置 Helm 存储库后,您可以在终端中使用另一个命令安装 Airflow 组件:

helm upgrade airflow apache-airflow/airflow --namespace airflow

此命令将使用一组默认配置设置安装 Airflow。这对开发来说很好,但我们需要更多的生产。

让 Airflow 为生产做好准备
当您不指定任何设置时,您会发现密码和密钥相当弱。因此,您需要在 Airflow 安装中添加一些配置。

让我们创建一个名为的新文件values-secrets.yml并向其中添加以下设置:

webserverSecretKey: <random-string>
defaultUser.password: <your-password>
data:
  metadataConnection:
    user: postgres
    pass: <your-password>

我们在这个文件中配置了一组密钥:
  • 首先,我们配置webserverSecretKey使会话使用唯一签名进行签名。这可以防止用户在 Airflow 的不同安装之间转移会话。
  • 接下来,我们为默认用户配置密码admin。你会想在这里设置一些强大的东西。
  • 然后,我们配置数据库连接的用户名和密码。Airflow 的舵图配置了数据库服务器和气流服务器。Helm 会自动为数据库服务器和气流服务器设置正确的密码,以便它可以连接。

设置机密后,您可以使用以下命令更新 Airflow 安装:

helm upgrade airflow apache-airflow/airflow --namespace airflow \
    -f values-secrets.yml

以前,我们只指定安装的名称和安装哪个包。现在我们添加了一组值以使用我们存储在 values-secrets.yml文件中的值。

在 Kubernetes 中安装 Airflow 后,您可以使用以下命令将端口转发到 Web 服务器来访问它:

kubectl port-forward -n airflow svc/airflow-webserver 8080:8080

打开网络浏览器并使用您存储在机密文件中的凭据登录。

如果一切按预期进行,您现在应该会看到 Airflow 的用户界面。让我们配置它以从存储库中加载一些 DAG。

将 GIT 存储库连接到 Airflow
Airflow 将 DAG 存储在一个名为dags. 您需要将 DAG 文件上传到此文件夹或通过其他方式同步它们。

我们很幸运,因为 Airflow 可以选择将 DAG 文件从 GIT 同步到正确的位置,以便 Airflow 运行它们。

我们需要创建另一个名为的文件values-sync.yml并向其中添加以下内容:

dags:
    gitSync:
        repo: <repo-url>
        branch: main
        depth: 1
        enabled: true
        subPath: dags
        sshKeySecret: airflow-ssh-secret
extraSecrets:
  airflow-ssh-secret:
    data: |
      gitSshKey: '<your-key>'

您需要配置 GIT 存储库的 URL。我在示例代码中使用了 Github 存储库。

接下来,您必须告诉 Airflow DAG 文件在存储库中的位置。我已将它们存储在子文件夹中,但您可以将它们存储在根文件夹中。如果要将文件存储在根文件夹中,则需要清除subPath.

如果您使用的是私有存储库,则需要将 SSH 公钥部署到要链接的 Github 存储库,并将私钥存储在gitSshKey 设置中。确保将私钥编码为 Base64,否则同步过程无法读取它。

配置 GIT 同步的设置后,您可以使用以下命令更新安装:

helm upgrade airflow apache-airflow/airflow --namespace airflow \
    -f values-secrets.yml -f values-override.yml

安装完成更新后,您可以将您的第一个 DAG 提交到源代码管理。让我们快速浏览一下构建示例 DAG 以确保完整性。

为 Apache Airflow 构建 DAG
如果您已经在 Apache Airflow 中编写 DAG,那么接下来的部分就很无聊了。但是,如果您以前没有看过 DAG,我认为最好看一下 DAG。

Apache Airflow 有两种使用 DAG 构建工作流的方法。您可以完全使用在操作员上运行的任务来构建它。您还可以使用任务流 API。

我将展示 Taskflow API,因为它是上手最快的 API。

例如,我创建了一个非常基本的 DAG,它不会做任何具体的事情。在这里向您展示 DAG 的基本原理:

from airflow.decorators import dag, task
import pendulum


@dag(
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz='UTC'),
    catchup=False,
    tags=['sample']
)
def sample_etl():
    @task()
    def extract():
        pass

    @task()
    def transform():
        pass

    @task()
    def load():
        pass

    extract() >> transform() >> load()

sample_dag = sample_etl()

我们需要两件事来构建 DAG。首先,我们需要创建一个标记为的函数@dag(),它将构建任务流。每个任务都是一个标有 的函数@task()。

我们将 设置schedule_interval为None所以 DAG 不会自动执行。您可以将间隔设置为每天、每小时或自定义设置。

开始日期告诉 Airflow 应该何时首次安排 DAG。我们已将此设置为过去的日期。但是您可以将其设置为未来的日期。

我们有三个任务,提取、转换和加载。这些任务必须按顺序执行。我们可以通过将每个任务转发到下一个来告诉运行时。

请注意,我们在这里调用任务,但不会执行它们。它只是告诉调度程序何时执行哪个任务。

最后,我们调用 DAG 函数来告诉运行时我们有一个新的 DAG 要加载。

它看起来像普通的 Python,但事实并非如此。每个函数都打包到一个 Kubernetes pod 中,并在 Airflow 调度 DAG 时执行。

当您将新 DAG 提交到链接存储库时,它将在您推送更改后一分钟可用。


对代码感兴趣?你可以在这里得到它并自己尝试一下