Windmill:最快的自托管开源工作流引擎


我们对 Windmill 进行了基准测试,认为它是 Airflow、Prefect 甚至 Temporal 中最快的自托管通用工作流引擎。对于 Airflow,有速度快了 10 倍!

工作流引擎编排工作人员的有向无环图 (DAG) 中定义的作业,同时尊重依赖性。

主要优点包括资源分配、并行性、可观察性和持久性。高效的工作流引擎可以快速调度工作人员的作业,并且工作人员可以快速拉取和运行作业。

由于使用 Postgresql 进行状态管理和原始 SQL 语句之间的转换,避免了最终的一致性问题,Windmill 被强调为非常快。工作被流水线化以提高效率。

网友评论:

  • Windmill 因其快速的性能、对 Discord 的积极开发人员支持以及按需触发工作流程和脚本的能力而受到赞誉。
  • 虽然速度很重要,但更重要的因素是工作流引擎是否可以支持同时运行的多个作业,并且每个作业的性能可预测。
  • 使用 Postgres 作为后端受到质疑,建议探索 MongoDB 等基于文档的数据库,以避免数据库迁移问题。
  • 由于 Windmill 的脚本功能以及与 VS Code 等工具的集成,Windmill 被认为特别适合开发人员,但非开发人员也成功将其用于内部工具。

Temporal
从某种程度上来说,Temporal不是一个工作流引擎,而是一个专门的持久执行引擎。

Temporal 实际上并不管理工作,而只管理任务队列。因此,即使在编写了临时工作流程之后,您仍然需要单独管理您的工作人员。
Windmill 还支持反应性(又名等待事件),并且也可以作为持久执行引擎。

Temporal 的功能令人惊叹,如果 Windmill 和 Temporal 之间存在重叠,那么显然在某些用例中您应该使用 Temporal 而不是 Windmill(作为 Uber 规模的微服务异步模式的骨干)

另一方面,将任意作业发送到内部集群超出了 Temporal 的范围,因为您仍然需要事先痛苦地部署“工作程序”。

我们将 Spark 或 Dagster 等分析/ETL 引擎排除在外,因为它们本身不是工作流引擎,即使它们是构建在工作流引擎之上的。

工作流引擎与作业
作业队列是工作流引擎的核心,构成任何后台作业处理的关键。已经有很多出色的队列实现,以托管服务 (SQS)、分布式可扩展服务 (Kafka) 和软件(例如:带有 rmsq 的 Redis)或库 (Orban) 的形式。它们大多足以自行使用,并且许多开发人员会通过围绕作业队列构建自己的逻辑来完全避免使用工作流引擎,从而获得满足。这类似于编写您自己的专用工作流引擎。

什么是工作流引擎,什么是“包罗万象”的工作流
首先是一些定义,工作流是由代表作业规范的节点组成的有向无环图 DAG。工作流引擎是一种分布式系统,它能接收工作流,并协调工作流在工作者身上完成,同时尊重每个作业的所有依赖约束。工作流的种类繁多,许多软件都有特定领域的工作流引擎和工作流规范(如果你是一名软件工程师,你可能已经在不知不觉中编写了一个)。

在这里,我们感兴趣的是至少能用一种主要编程语言(Python/Typescript/Go/Bash)运行任意代码的工作流。这些工作流最通用,但也最复杂,最难优化。每个节点都是一段代码,它从其他步骤(或流程输入)获取参数和数据作为输入,执行一些副作用(http 请求、计算、写入磁盘/s3),然后返回一些数据供其他步骤使用。

工作流程引擎的主要5个优势:

  • 资源分配:可充分利用集群,将每项作业分配给拥有不同资源(CPU、内存、GPU)的不同工作员,并保证工作员的全部资源都能为作业所用。
  • 并行性:并行性:当工作流的约束条件允许某些步骤并行运行(分支、for-loop)时,工作流引擎可以将这些步骤分派给多个物理上独立的工作者,而不仅仅是线程。
  • 可观察性:每个作业都有一个唯一的 ID,可以单独观察:可以检查输入、日志、输出和状态
  • 耐用性:机器死机、副作用因意外原因失效。工作流需要在接近意外事件发生时可重新启动。实现这一点的方法之一是惰性:单个操作与多个相同操作的效果相同。如果有疑问,可以重放整个流程而不产生任何后果。这通常通过日志文件和 sdk 来实现,当操作附加的唯一 ID 是日志的一部分时,sdk 会跳过副作用。另一种方法是对流程状态进行事务快照,存储每次操作后的状态。要恢复时,只需重新加载最后的状态,然后从那里开始执行。Windmill 采用的是后者,并假定在用户领域需要时可以实现幂等性。
  • 反应性:暂停流程,直到根据 webhook 或批准等事件再次恢复流程。

此外,一个包罗万象的工作流引擎应能动态注册新的可用工作人员,并能轻松部署新的工作流,将不同的工作分配给不同的工作人员,以及监控工作人员和系统本身的健康状况。

工作流引擎之上的开发者平台应能处理权限问题,使不同权限级别的用户可以运行不同的工作流,并使这些工作流能根据调用者的角色访问不同的资源。

为什么 Windmill 速度很快?
在工作流引擎中,"效率 "一词取决于:

  • 计算转换的效率,根据上次完成的工作安排新工作的效率,以及工人本身提取已安排工作并运行它们的效率
  • 在步骤之间传递数据的效率
  • 工作程序提取作业、开始执行代码并提交结果和新状态的效率

Windmill 的速度极快,因为它在这三个方面都采用了简单的设计,处处优化,最大限度地利用了 Postgresql 和 Rust。

系统设计和队列
Windmill 提供了一个二进制文件(由 Rust 编译而成),既可以作为 api 服务器运行,也可以作为 Worker 运行。Worker 和服务器都连接到 Postgresql,但彼此互不连接。服务器只公开 api 和前端。队列是在 Postgresql 本身中实现的。作业可以通过调用 API 从外部触发,API 会将新作业推送到队列中。

作业存储在 Postgresql 的两个表中:

  • queue(当作业未完成时,甚至在运行时)
  • completed_job

作业在启动时不会从队列中移除,但其字段 running 会被设置为 true。队列是通过传统的 UPDATE SKIP LOCKED 来实现的。

UPDATE queue
SET running = true
, started_at = coalesce(started_at, now())
, last_ping = now()
, suspend_until = null
WHERE id = (
    SELECT id
    FROM queue
    WHERE running = false AND scheduled_for <= now() AND tag = ANY($1)
    ORDER BY priority DESC NULLS LAST, scheduled_for, created_at
    FOR UPDATE SKIP LOCKED
    LIMIT 1
)
RETURNING *

只要每个字段都有适当的索引,Postgresql 就能做到最快的速度。这种方法的一些变种是将作业从队列中删除,而不是更新标记,或者设置一个时间,在这个时间段内,其他作业无法被拉动。在引擎盖下的原理都是一样的。

推送流程作业时,其输入、指向不可变流程定义的指针和初始流程状态(见下)都会被推送到作业行中(这是一个很大的行!)。

然后,一个 Worker 会选择流程作业,读取流程定义和流程状态,并意识到它需要推送队列中的第一步,仅此而已。这就是流程的初始转换。

如果作业是流程的一个步骤(该流程作业是一个单独的作业,并将父作业设置为该流程作业),则工作者会一次拉一个作业,将其运行至完成,并推进状态。服务器本身不进行协调,每个流程的转换都由工人自己完成。

状态
工作流引擎用有限状态机(FSM)来表示作业。常用的 4 种主要状态是

  1. 等待先决条件(收到 webhook 等事件,或完成所有相关作业)
  2. 前提条件已满足,等待工作者拉取作业并执行它
  3. 运行
  4. 完成(成功或失败)

其他状态通常是对上述 4 种状态的细化

在 Windmill 中,整个流程本身就是一个有限状态机,流程的规格和流程状态都是易于读取的结构。

流程状态完全由步进计数器和流程模块状态数组定义。有些流程模块状态更为复杂,例如与 for 循环和分支相对应的状态。需要注意的一个有趣方面是,分支或 for 循环迭代等子流程都是各自定义明确的作业,它们的指针指向触发这些作业的父流程(parent_flow)。

因此,分支和 for-loop 是一种特殊的流程状态,其中包含一个 ID 数组,指向所有已启动的子流程。当按顺序执行 for 循环/分支时,过渡包括查看流程状态,如果还有迭代,则开始下一次迭代;如果没有迭代,则完成该步骤。

这种设计的方便之处在于,每个状态转换都只是一个事务性的 Postgresql 语句。Postgresql 是 ACID,我们可以充分利用这些特性。在工作流引擎中,达到最终一致性是最难、最慢的事情。我们可以通过以下方式跳过困难的部分

  • 使用实现了 MVCC 和行级锁的 Postgresql。
  • 在工作结束时,由工作者自己完成转换

流状态是以 JSONB 的形式实现的,因此状态转换是以原始 sql 编写的,它直接更改队列中与流作业相对应的行中的流状态。这既正确又极其高效。这部分并没有真正受益于 Rust,可以用任何语言实现,甚至可以直接用 PL/SQL 实现。

最棘手的两个过渡

  • 嵌套流程的最后一步(分支的分支)
  • 并行分支/迭代

详细介绍
  • 在嵌套流程的最后一步,工作程序会更新父流程的状态,但会意识到该流程现在也已完成,因此会简单地递归到该流程的父流程,并进行流程转换。
  • 并行步骤:在启动该步骤时,所有子流程都会排队,而不是一次运行一个流程。然后,任何完成每个分支子流程最后一步的工作者都会原子式地增加一个计数器。将计数器增加到与迭代次数一样长的工作者知道,由于整个步骤已经完成,因此它需要对整个步骤进行转换。

此外,对已完成任务的处理是在后台 tokio 任务中完成的,该任务在通道中接收已完成的任务,从而在一定程度上实现了流水线作业。工人无需等待数据库完全确认作业,就能接收另一个可用作业。

Windmill 的转换速度非常快,因为它们是作为原始 Postgresql 语句实现的,而且在执行作业和更新数据库之间有管道连接。

数据传递
在 Windmill 中,有 3 种主要的数据传递方式:

  • 一个步骤的每个输入都可以是一个 javascript 表达式,可以引用任何步骤的输出

typecript、python、go、bash 的每个脚本都有其主要签名(由前端的 WASM 程序解析),可以预先计算给定步骤所需的不同输入。对于每个输入,我们都可以定义一个静态输入或一个 javascript 表达式,它可以引用任何步骤的结果,例如:results.d.foo,其中 d 是步骤的 id。复杂的 javascript 表达式将由嵌入式 v8 使用 Deno 运行时进行评估。按表达式计算大约需要 8 毫秒。

鉴于每个步骤结果本身就是一个作业,其中包含 json 格式的结果,因此只需一条 sql 语句就能检索到所需的结果。节点 id 到作业 id 的映射保存在流程作业状态中。

此外,大多数表达式都很琐碎,可以直接转换为原始 jsonb 语句:

SELECT result #> $3 as result FROM completed_job WHERE id = $1 AND workspace_id = $2"

其中 $3 为

json*path.map(|x| x.split(".").map(|x| x.to_string()).collect::<Vec<*>>())

  • 共享临时文件夹中的数据 可以将流程配置为完全在同一个 Worker 上执行。在这种情况下,每个作业的临时文件夹内都会共享一个文件夹并用同义词链接(作业在临时文件夹中启动,执行结束后会删除该文件夹)
  • 使用 s3 集成在 s3 中传递数据(该部分的具体更新将在第 5 天介绍)

工作者worker效率
在正常模式下,工作者一次拉取一个作业,识别作业使用的语言(python、typecript、go、bash、snowflake、Postgresql、mysql、mssql、bigquery),然后生成相应的运行时,然后运行作业。

与基于容器的工作流引擎相比,Worker 可以裸运行作业,而无需运行容器,从而提高了性能。不过,出于沙箱的目的,工作程序本身可以在容器内运行,并在 nsjail 沙箱中运行每个作业。

对于查询语言来说,除了建立连接外,没有冷启动。bash 没有冷启动,而 go 脚本在 AOT 中编译,然后在本地缓存二进制文件,以避免冷启动。

支持执行任意的 python 代码很难,因为我们必须支持任何带锁文件的导入。由于 Worker 不会在容器中运行 python 代码,因此必须动态处理依赖关系。为此,我们开发了一种高效的分布式缓存系统,它可以动态 pip 安装特定的一对(软件包、版本),并在运行中创建一个动态虚拟环境来执行代码。在运行代码之前,我们会对导入进行有效分析。

类似地,在 typescript 中,运行时要么是 deno,要么是 bun,这样它们就能从全局缓存中获益,永远不必重复安装相同的依赖关系。

然而,快速处理依赖关系还不足以达到最高速度,还需要冷启动生成一个 python(约 60 毫秒)或 deno/bun (约 30 毫秒)进程。在事件流情况下,逻辑本身大约需要 1 毫秒,因此冷启动是运行脚本开销的 30 倍。

幸运的是,我们最近为脚本实现了专用 Worker,它在 Worker 开始时催生一次 python 进程,然后在 while 循环中执行脚本逻辑,在 stdin 中接收作业输入,并将输出返回到 sdout。

我们将这种方法扩展为流程专用 Worker,本质上是相同的。一开始,这些 Worker 会为 Python 或 Typescript 实现的每个步骤生成一个相应的专用进程。这就彻底消除了冷启动,使 Windmill 流能够处理偶数流用例,每个 Worker 每秒可处理多达 1000 个步骤。

流的专用工作者会拉取与流相关的任何作业,然后将其路由到适当的专用进程。仍然一次只执行一个作业,但在预热流程中执行。

结论
Windmill 的速度非常快,因为它依赖于 Postgresql 和 Rust,并采用了简单的设计,能够优化每个部分,无论大小。这与用 Zig 实现的 Bun 快速的原因有点类似,都是尽可能地进行了优化。

Windmill是一个开源可自托管的无服务器运行时和平台,将代码的强大功能与低代码的速度相结合。我们将您的脚本转换为内部应用程序和可组合的流程步骤,以自动化重复的工作流程。