这篇教程要介绍的是 Temporal,一个开源的工作流引擎,2020年首次发布。它的灵感来源于其作者之前在Uber、Azure和亚马逊的工作经验。
2. Temporal 是什么?
简单说,Temporal 让你能用它支持的编程语言(比如 Java, Go, TypeScript 等)来编写“工作流”。这些工作流负责协调一个个“活动”,“活动”就是具体要干的一件小事,比如发一封邮件、提交一个支付审批请求,或者任何其他的业务步骤。
从架构上看,Temporal 采用了微服务设计中一种著名的“Saga模式”(带协调器的变体)。这个模式的核心特点是有一个“中央协调服务”,它负责跟踪每个工作流实例的执行进度,并向处理具体“活动”的“工人”分发指令。
如果某个“工人”程序崩溃了,或者某个“活动”执行失败了,Temporal 的这个协调器会自动、透明地重试,并从断掉的地方继续执行。这在分布式系统里特别有用,因为故障几乎是不可避免的。
关于怎么部署Temporal,它提供了几种选择:
* 独立服务器:用于本地开发。
* 本地部署:可以部署在Docker、Kubernetes或者物理机上。
* 云服务:使用Temporal官方提供的托管服务,带技术支持和SLA保障等。
我们演示时会用独立服务器。其他部署方式的细节可以看官方文档。
3. 核心概念
在写代码之前,先简单了解下Temporal的核心概念。
3.1. 工作流
一个工作流定义了完成某个业务需求所需要的一系列“活动”的步骤顺序。
一个工作流实例还可以启动其他的“子工作流”。这些子工作流可以和父工作流同步或异步运行。默认情况下,如果父工作流结束了,子工作流也会被终止。当然,也可以设置成让子工作流独立继续运行。
工作流必须是“确定性的”,并且一个工作流可能运行几天、几周甚至几年才完成。
什么叫“确定性”?就是说,对于相同的输入,工作流代码必须每次都走完全相同的执行路径。
举个例子,假设我们有这么一段工作流逻辑:
java
public selectDeliveryMethod(Order order) {
int weight = deliveryActivities.calculateWeight(order); // 计算订单重量
if (weight > 100 ) {
deliveryActivities.sendByTrain(order); // 重量大于100,用火车送
}
else {
deliveryActivities.sendByAirplane(order); // 否则用飞机送
}
}
这段代码是确定性的,因为只要第一次调用calculateWeight
的结果不变,它总是执行同一条路径。但如果我们把判断逻辑改成依赖当前时间,那就失去确定性了,因为执行结果会随着一天中的不同时间而变化。
Temporal要求工作流具有确定性的原因是:它记录了所有“活动”调用的日志。当“工人”失败时,Temporal要靠这些日志来“重放”工作流的执行,从而恢复到失败前的状态。想深入了解可以看官方文档。
3.2. 活动
“活动”是工作流里真正“干重活”的单元,负责执行工作流每一步要求的具体操作。比如,一个订票工作流可能有一个“活动”是向航空公司发消息锁定座位,另一个“活动”是给客户发确认邮件,等等。
“活动”应该遵循“单一职责”原则,并且通常是短时间完成的。如果一个“活动”失败了,它直接抛出一个异常就行,让Temporal去处理这个失败。
默认情况下,因为异常而终止的“活动”会根据重试策略自动重新提交。开发者也可以通过抛出特定的ApplicationFailure
异常来告诉Temporal某个“活动”不应该被重试。
“活动”有一个重要要求:必须是“幂等的”。因为Temporal的容错机制遵循“至少一次”原则。这意味着,在从故障中恢复时,同一个“活动”可能会被执行多次。
比如,可能一个“活动”已经成功执行完了,但网络问题导致“工人”没能把结果报给Temporal服务器。服务器收不到响应,超时了,就会重新执行这个活动。在这种情况下,“活动”的代码自己要能判断出这是个重复请求,并返回和第一次执行相同的结果。
3.3. 工人
“工人”是一个独立运行的程序,里面包含了工作流和“活动”的具体实现代码。它的工作就是不停地向Temporal服务器“要活干”(轮询任务),干完后再把结果报回去。
在生产环境中,通常会有多个相同的“工人”实例同时运行,这样既能提高处理速度,也能保证一个挂了其他的还能顶上。
3.4. 任务队列
这是由Temporal服务器管理的逻辑队列,负责把工作流任务和“活动”任务路由给相应的“工人”。由于“工人”是通过轮询这些队列来获取任务的,所以任务队列也是在不同“工人”之间分配负载的主要方法。
3.5. Temporal 服务器
这是Temporal的后端服务,负责管理工作流的状态、数据持久化和协调工作。这个服务器还提供了一个Web管理界面,可以用来监控工作流和“工人”的执行情况。
4. 本地开发环境搭建
(这部分是具体的操作步骤,大意是:下载Temporal的命令行工具,用一个简单的命令就能启动一个本地开发服务器,然后可以通过Web界面查看和管理。)
5. 创建一个简单的工作流示例(Hello World)
我们来创建一个最简单的“Hello World”工作流,它只包含一个打招呼的“活动”。
5.1. 项目准备
在Java项目里添加Temporal的SDK依赖包。
5.2. 定义工作流接口
这是一个普通的Java接口,上面加了@WorkflowInterface
注解。接口里必须有一个且只有一个方法加上@WorkflowMethod
注解,这个方法就是工作流的入口。
例如,我们的HelloWorkflow
接口:
java
@WorkflowInterface
public interface HelloWorkflow {
@WorkflowMethod
String hello(String person); // 入口方法,传入人名,返回问候语
}
除了入口方法,工作流接口还可以有其他方法,用于和工作流交互,比如发信号、查询状态等,但这个简单例子用不到。
5.3. 定义和实现“活动”接口
我们还需要定义“活动”接口,它用@ActivityInterface
注解。里面的每个活动方法用@ActivityMethod
注解。
例如,打招呼的“活动”接口和实现:
java
// 接口
@ActivityInterface
public interface SayHelloActivity {
@ActivityMethod
String sayHello(String person);
}
// 实现
public class SayHelloActivityImpl implements SayHelloActivity {
public String sayHello(String person) {
return "Hello, " + person; // 实际干活的代码,非常简单
}
}
关键是,写“活动”的实现代码时,你几乎可以不用了解Temporal的具体概念,这让代码很干净,也方便做单元测试。
5.4. 实现工作流接口
工作流的实现类包含协调“活动”执行的编排逻辑。
我们这个例子很简单,就是调用那个唯一的“活动”:
java
public class HelloWorkflowImpl implements HelloWorkflow {
// 创建一个“活动”的代理/stub
private final SayHelloActivity activity = Workflow.newActivityStub(...);
@Override
public String hello(String person) {
// 工作流逻辑:调用“活动”
return activity.sayHello(person);
}
}
在初始化时,我们通过Temporal提供的工厂方法创建了“活动”的代理/stub,并设置了一些选项(比如超时时间)。
别看代码简单,当你调用activity.sayHello(person)
时,背后发生了很多事:
1. Temporal服务器会在工作流的历史记录里记下一笔:“调用了sayHello活动”。
2. 一条执行该活动的消息被放入任务队列。
3. 一个“工人”从队列里拿到这个消息,执行真正的sayHello
方法,然后把结果返回给服务器。
4. 服务器再记下一笔:“活动执行成功”。
5. 服务器把结果返回给工作流代码。
把这些复杂的过程隐藏在背后,正是Temporal带来的便利。
5.5. 设置“工人”程序
要运行例子,我们需要一个“工人”程序,它来托管工作流和“活动”的实现代码。
“工人”程序有一些样板代码,用于初始化Temporal客户端、创建“工人”实例、告诉“工人”它要处理哪些工作流和“活动”(注册),最后启动“工人”开始轮询任务队列。
这个例子假设“工人”和Temporal服务器都在本地运行。在生产环境中,连接信息和队列名通常需要通过配置文件或环境变量来设置。
6. 运行工作流
现在所有零件都准备好了,可以运行工作流了。
6.1. 启动“工人”程序
在IDE里直接运行“工人”程序的main方法,或者用Maven命令启动。启动后控制台会显示“工人”开始轮询任务队列了。
6.2. 用命令行启动工作流
最简单的方法是用Temporal的命令行工具来启动一个工作流实例,并指定任务队列和输入参数。启动后,可以用命令查询这个工作流的执行结果,应该能看到返回了"Hello, Jdon"。
6.3. 在Web界面上查看结果
在Temporal的Web管理界面上,也能看到刚刚执行完成的工作流实例,可以点击查看详细执行历史。
6.4. 用API启动工作流
在实际应用中,我们通常是通过Temporal的API来启动工作流的。主要使用WorkflowClient
这个类,它有几种创建方式(连接测试环境、本地开发服务器或正式服务器)。
拿到WorkflowClient
后,可以用它创建一个工作流的代理/stub,然后直接调用工作流的入口方法(如workflow.hello("Jdon")
),这种方式会同步等待工作流完成。
更常见的用法是异步启动工作流,然后通过返回的WorkflowExecution
信息来查询结果或与运行中的工作流交互(如取消、发送信号等)。
7. 总结
这篇文章介绍了Temporal工作流引擎的基础知识,重点是初始项目设置和核心概念。
总的来说,虽然Temporal引擎提供了非常多的功能和可能性,但在实践中使用它被设计得相当直观和简单。