Airbyte如何使用Temporal扩展工作流程编排?


Airbyte 的作用是提供一种在源和目标之间执行数据同步的简单方法。工作流程编排很重要,因为它确保数据按照客户指定的频率同步。
在本文中,我们将讨论能够支持 Airbyte 数据同步的编排器所必需的特性以及我们如何利用 Temporal。然后,我们将详细介绍如何构建长期运行的 Temporal 工作流,以促进灵活的内部同步调度。我们详细介绍了使我们能够实现这些工作流的特定于时间的功能。 

对于 Airbyte 的同步工作流程,我们的编排系统需要能够:

  • 将工作分配给 Docker Compose 和 Kubernetes 上的节点。在一个或另一个环境上运行的配置之间应该有最小的差异。
  • 使用时间间隔和 crontab 灵活地安排工作流程,同时可以通过可预测的行为动态启用、禁用或更改计划。 
  • 跨多个节点同时横向扩展至数千个工作流。

采用多种方法的工作流编排器有许多选项。大多数工作流编排器处理长时间运行的异步工作流,这些工作流可以处理状态更新和重试,同时保持执行历史记录。

然而,Temporal也使我们能够以编程方式配置工作流程。虽然我们的大多数工作流程代表了数据同步,但有些执行的动作,如查找数据库的模式。这些动作会带来延迟,Airbyte的用户在用户界面上会感觉到这些延迟。Temporal允许我们以最小的开销配置和执行工作,这在Airflow和其他几个常见的协调器中是不可能的。

我们正在寻找的一个特点是能够轻松地将该工具嵌入到我们的Airbyte的OSS版本中。Airbyte在Docker Compose和Kubernetes上都得到了支持,Airbyte的操作者在维护分布式系统方面有广泛的经验。我们避免了像Argo这样伟大的Kubernetes专用工具,以防止Docker Compose和Kubernetes系统之间的Airbyte内部同步编排出现分歧。Temporal提供各种部署选项,在我们所有的部署平台上发挥作用。

Airbyte的API也使用Postgres作为数据库。由于Temporal可以使用Postgres作为备份存储,所以我们的OSS用户群不需要额外的容器依赖,即使我们在逻辑上将Airbyte的配置数据库和Temporal的内部数据库分开。对于生产环境,我们建议分别使用不同的数据库。

Airbyte平台的绝大部分代码也是用Java编写的。虽然不是严格的要求,但我们更倾向于使用高质量的Java SDK与协调器进行交互。Temporal提供了一个富有表现力的Java SDK,使用代码来指定工作流和活动的行为。你只需编写代码就能得到一个工作流

在可扩展性方面,我们希望能够通过我们的调度和工人执行基础设施处理同时运行的成千上万的同步。Temporal能够解决这个问题,它允许在不同的服务器组件之间进行独立的扩展,并提供了一个简单的模型,将工作者节点连接到独立的任务队列,这使得在像Kuberentes这样的多节点系统上可以轻松扩展。

在更深入地了解我们的解决方案的架构之前,我们将解释Temporal工作流模型的一些基本构建块。

Temporal概述
Temporal是一个工作流引擎,它处理被分割成步骤或 "活动 "的工作流的状态管理。我们使用Java SDK连接到Temporal服务,该服务提供了一个API,用于注册可以运行Temporal活动的工作者,启动异步工作流,向正在运行的工作流发送信号,并查询工作流的状态。这项服务很容易被配置为在Docker或Kubernetes上运行,并使用Postgres或Cassandra等支持数据存储。

如果你来自一个面向批处理的工作流引擎,如Airflow,许多核心概念类似DAGs、子DAGs和sensors。

Java SDK允许表达工作流的概念(Airflow中的DAG),它以定义的顺序执行一些活动。这是任何Temporal运行的入口。工作流可以接收信号和执行查询,这将在下面解释。工作者节点与Temporal服务器通信,以确定哪些工作流应该在单个节点上执行。这是通过在创建时将某些工作流分配给命名的任务队列来协调的,这些任务队列可以在单个节点上注册。

活动(Airflow中的任务)是Temporal工作流中的步骤,代表一些操作或操作。这些操作可以包含任何逻辑,但由于它们代表了可重试的工作单位,它们通常被用来分离行为,如对外部数据库或API的调用。一个活动的输出被序列化,并由Temporal的支持数据存储以及活动和工作流的任何相关状态信息一起存储。

也可以有子工作流(Airflow 中的 subDAGs),它是由其他工作流启动的工作流。它可以配置子工作流程的行为,根据父工作流程失败或被取消的情况,失败或继续。

有两种方法允许与工作流程进行互动:信号和查询。信号允许修改一个工作流的内存状态。这些方法是异步的,不能保证立即运行。如果一个Temporal工作流依赖于一个被信号修改的条件,它将智能地重新评估该条件,以决定工作流是否应该继续。查询只是允许读取一个工作流的状态。我们通常使用这个功能来检查长期运行的工作流的状态。

关于Temporal是什么以及它的功能的更多信息,我们建议查阅Temporal网站的介绍和其他指南。

如何组织Temporal工作流
Airbyte的主要Temporal工作流用于将源(API、DB、文件等...)之间的数据同步到目的地(DB、仓库、云存储等...)。为了正确运行同步,我们需要按照特定的顺序执行以下操作。

  • 获取调度配置
  • 等待适当的时间
  • 获取作业配置
  • 更新作业状态,报告它正在运行
  • 运行同步
  • 更新作业状态,报告失败或成功的运行情况

所有这些不同的步骤都被隔离在一个专门的Temporal动作中。一个动作是以下内容之一:一个Temporal活动、一个子工作流或一个Temporal工作流内部方法。在下面的章节中,我们将提供所有三种类型的行动的例子。

我们决定使用一个永久运行的Temporal工作流来表示每个数据同步。为了限制永久运行的工作流的事件历史的大小,我们使用了一个叫做ContinueAsNew的时态Temporal概念,在执行运行后从第一步重新开始。

这种设计有利于完全定制我们支持的调度类型,因为我们在运行前等待的时间是在一个活动中确定的。这将使Airbyte支持基于cron的工作流调度、停工期和基于时间间隔的调度。此外,Temporal通过在冷库中缓存状态,并在需要时才加载到内存中,使长时间运行的 "等待 "操作变得高效。

设计一个同步工作流程
Airbyte中的同步分为3个活动。

  • 数据复制
  • 数据归一化(可选)
  • dbt转换(可选)

所有这些活动都是长期运行的操作,根据数据量和连接器的吞吐量,可能需要几分钟到几个小时。所有这些活动都与在几毫秒内运行的获取时间表或用数据库I/O报告状态的非常短的活动有很大不同。为了在一个单独扩展的工人池中隔离长运行,我们在子工作流中运行这些活动。这个子工作流并不独立于父工作流;如果父工作流被终止或取消了,子工作流也会被终止。这使得我们可以使用父工作流级别的唯一性约束来确保每个连接只能运行一个同步。

实现一个Temporal 工作流
我们正在使用Temporal Java SDK来定义一个工作流。它提供了一组注释,用于装饰一个接口,让Temporal知道如何运行我们正在编写的不同方法。在接口层面上有一个主要的注解。@WorkflowInterface。有3个方法注解,我们正在使用。

  • @WorkflowMethod。每个工作流只能有一个,这个方法包含了对活动调用的实现排序。这是运行工作流的入口。
  • @SignalMethod。装饰了一个方法,该方法将作为信号被临时性调用。它不执行I/O,只更新Temporal工作流的内存状态。
  • @QueryMethod: 装饰了一个将被temporal调用作为查询的方法。它是一个同步方法,将返回内存中Temporal状态的某些部分。

@WorkflowInterface
public interface ConnectionManagerWorkflow {

 @WorkflowMethod
 void run(ConnectionUpdaterInput connectionUpdaterInput);

 @SignalMethod
 void submitManualSync();

 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 class JobInformation {

   private long jobId;
   private int attemptId;

 }

 @QueryMethod
 JobInformation getJobInformation();
}

实现一个Temporal的活动
这与工作流的实现非常相似。Temporal提供了一组注解,允许你装饰一个接口。然后,它将把这些方法识别为一个活动,并能够确保它们在每个工作流运行中只运行一次。比如说:

@ActivityInterface
public interface ConfigFetchActivity {

 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 class ScheduleRetrieverInput {

   private UUID connectionId;

 }

 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 class ScheduleRetrieverOutput {

   private Duration timeToWait;

 }

 @ActivityMethod
 ScheduleRetrieverOutput getTimeToWait(ScheduleRetrieverInput input);

}


然后,它需要在工作流程中注册。这样做将使你能够把它与最理想的工人库联系起来。

private final ConfigFetchActivity configFetchActivity =
   Workflow.newActivityStub(ConfigFetchActivity.class,
ActivityConfiguration.SHORT_ACTIVITY_OPTIONS);

触发同步运行
有时Airbyte用户想手动触发同步。我们使用Temporal的信号方法来传输这个请求。信号方法是一种与Temporal通信并修改其内部状态的异步方式。修改状态后,Temporal将有效地重新评估等待条件,并允许运行子工作流。

取消同步运行
取消是一个很好的例子,说明我们可以结合不同的Temporal原语来安全地处理各种协调器的状态。具体来说,我们将讨论如何使用以下时态基元:子工作流、信号方法、取消范围和继续为新。

取消是由外部用户触发的,当当前运行需要被取消时,该用户通过Temporal服务向工作流发送信号。这可以通过Temporal API或Temporal CLI完成。为了能够处理信号,我们首先需要在我们的工作流实现中声明并实现信号方法。在Java SDK中,Temporal为此使用了一个注解,你可以在这里看到:

@WorkflowInterface
public interface ConnectionManagerWorkflow {

 @WorkflowMethod
 void run(ConnectionUpdaterInput connectionUpdaterInput);

 @SignalMethod
 void cancelJob();
}

信号方法signal method 的实现非常简单,它只是将作业的状态设置为取消的状态,然后取消cancellation Scope(我们将在下面解释)。然后,该状态将被用于报告我们内部数据库对同步的表示中的正确状态。

@Override
public void cancelJob() {
 workflowState.setCancelled(true);
 cancellableSyncWorkflow.cancel();
}


cancellation Scope允许您使用将在工作流取消时运行的回调方法。如果cancellation Scope取消作用域的取消方法被调用,它将停止回调方法和其余活动的执行。然后,取消会被传播到子工作流中。这使我们能够确保在父工作流被取消时,同步被正确地停止。这就是如何创建一个取消作用域。

Workflow.newCancellationScope(() -> {
  // Implementation here
}

这就是如何取消作用域:
cancellableSyncWorkflow.cancel();

如前所述,这将取消父工作流并将取消传播到子工作流中。我们已经配置了ParentClosePolicy来传播取消,而不是突然终止工作流。我们使用的是PARENT_CLOSE_POLICY_REQUEST_CANCEL,它允许我们优雅地终止子工作流。

总结
在这篇文章中,我们高屋建瓴地介绍了Airbyte在协调数据同步工作流时面临的挑战,以及我们选择使用Temporal建立一个可定制的解决方案的原因。我们还更深入地展示了一些Temporal的功能,我们使用这些功能是为了处理Airbyte公司协调数据同步的广泛需求。如果你有兴趣阅读我们调度的整个实现过程,源代码是开源的,可以在 here 获得。