Dapr 工作流实战:PubSub + 持久化编排 = 永不掉链的分布式业务引擎

Dapr Workflows 通过持久化执行与 Activity 编排,让复杂业务流程具备自动恢复、重试、事件等待能力,结合 PubSub 实现真正健壮的事件驱动架构。

用户在你的 App 上下单成功了,司机也接单了,可突然服务挂了——结果订单状态卡在“已接单”,没人通知乘客,没人计算车费,更没人回滚操作。传统微服务里靠 PubSub(发布订阅)传递消息确实解耦了系统,但一遇上多步骤、需协调、要容错的复杂业务,PubSub 就显得力不从心。

这时候,你真正需要的不是“消息通道”,而是一套能记住自己走到哪一步、重启后还能接着干、失败了还能重试甚至回滚的“数字大脑”——这就是 Dapr Workflows 要解决的核心问题。

从“消息传递”到“持久化编排”:为什么 PubSub 不够用?

很多人以为,只要用了 Kafka、RabbitMQ 或 Dapr 自带的 PubSub,就能搞定所有事件驱动场景。但现实是:消息可以传递,状态却无法追踪。比如网约车平台中,司机接单后要验证资质、计算预估车费、通知乘客、更新订单状态……这些步骤环环相扣,任一环节失败都可能造成资金损失或用户体验崩坏。传统做法是在数据库里手动记录状态、写一堆 retry 逻辑、为每个服务定制补偿机制——结果代码臃肿、测试困难、上线即埋雷。

而 Dapr Workflows 的出现,就像给你的业务流程装上了“自动驾驶仪”。它通过“持久化执行”(Durable Execution)机制,自动保存每一步的执行结果。哪怕服务进程被 kill、服务器宕机、容器被调度,工作流也能无缝恢复,从断点继续执行。关键在于:它用“重放”(Replay)代替“重跑”——当工作流恢复时,引擎会从头执行代码,但已成功完成的 Activity(活动任务)不会重复调用,而是直接复用之前的结果。这就要求你的工作流逻辑必须是“确定性的”(Deterministic),不能有随机数、外部 I/O 或时间依赖。

代码即流程:用 Java 写出像讲故事一样的业务编排

Dapr Workflows 最让开发者上头的一点,就是它把复杂流程写成了“顺序代码”。你不需要画状态机图、不需要写 XML 配置、更不需要学新 DSL。在 Spring Boot 里,你只需要实现一个 Workflow 接口,用熟悉的 lambda 表达式组织逻辑,再把实际操作(比如调用 REST API、写数据库)封装成 Activity 组件。比如这个网约车处理工作流:

@Component
public class RideProcessingWorkflow implements Workflow {
    
    @Override
    public WorkflowStub create() {
        return context -> {
            WorkflowTaskOptions options = taskOptions();
            // ...
        };
    }
}

注意:工作流本身不能直接做 I/O!所有外部交互必须交给 Activity。比如验证司机身份:

@Component
public class ValidateDriverActivity implements WorkflowActivity {
    
    @Autowired
    private RestTemplate restTemplate;
    
    @Override
    public Object run(WorkflowActivityContext context) {
        RideWorkflowRequest request = ctx.getInput(RideWorkflowRequest.class);
        logger.info("Validating driver: {}", request.getDriverId());
        if (request.getDriverId() != null && !request.getDriverId().isEmpty()) {
            logger.info(
"Driver {} validated successfully", request.getDriverId());
            return true;
        }
        throw new IllegalArgumentException(
"Invalid driver ID");
    }
}

看出来没?Activity 就是标准的 Spring Bean,可以注入任何依赖。而工作流通过 context.callActivity() 去调用它,用 .await() 等待结果——写法像同步,底层却是异步非阻塞的。这种“假同步真异步”的编程模型,极大降低了心智负担。

一行注解开启自动发现:Spring Boot 与 Dapr 的无缝融合

为了让开发更“Spring”,Dapr 提供了 @EnableDaprWorkflows 注解。你只需要把它加到主启动类上:

@EnableDaprWorkflows
@SpringBootApplication
public class DaprWorkflowApp {
    public static void main(String[] args) {
        SpringApplication.run(DaprWorkflowApp.class, args);
    }
}

Spring Boot 就会自动扫描所有实现 Workflow 和 WorkflowActivity 的类,注册为 Bean,并注入到 Dapr 运行时。这意味着你不用写一行注册代码,不用维护配置文件,完全遵循“约定优于配置”的哲学。这种深度集成,让 Java 开发者几乎感觉不到 Dapr 的存在——但它确实在背后默默为你扛起了分布式系统的复杂性。

从 REST 到 PubSub:工作流如何被触发?

工作流可以由多种事件启动。最直接的方式是通过 REST API:

@RestController
@RequestMapping("/workflow")
public class RideWorkflowController {
    @Autowired
    DaprWorkflowClient workflowClient;
    @PostMapping(
"/start-ride")
    public RideWorkflowRequest startRideWorkflow(
      @RequestBody RideWorkflowRequest request) {
        String instanceId = workflowClient.scheduleNewWorkflow(
          RideProcessingWorkflow.class, request); 
        request.setWorkflowInstanceId(instanceId);
        return request;
    }
}

但更强大的地方在于:它能和 Dapr PubSub 深度联动。比如当“司机接单”事件发布到 ride-hailing 主题时,自动启动工作流:

@PostMapping("/driver-accepted")
@Topic(pubsubName =
"ride-hailing", name = "driver-acceptance")
public void onDriverAcceptance(@RequestBody CloudEvent<RideWorkflowRequest> event) {
    RideWorkflowRequest request = event.getData();
    workflowClient.scheduleNewWorkflow(RideProcessingWorkflow.class, request);
}

这意味着你的系统既能响应用户操作(REST),也能响应内部事件(PubSub),形成闭环。更酷的是,工作流还能“等待外部事件”——比如在通知乘客后,暂停流程,直到乘客点击“确认上车”:

context.waitForExternalEvent("passenger-confirmation", Duration.ofMinutes(5), String.class).await();

这种“暂停-唤醒”机制,让长周期交互变得极其简单。

一步步编排:从验证司机到完成订单的全流程实现

现在我们把所有 Activity 串起来。首先是验证司机:

context.getLogger().info("Step 1: Validating driver {}", request.getDriverId());
boolean isValid = context.callActivity(
  ValidateDriverActivity.class.getName(), request, options, boolean.class)
    .await();
if (!isValid) {
    context.complete(new RideWorkflowStatus(
      request.getRideId(),
"FAILED", "Driver validation failed"));
    return;
}

接着计算车费(这里简化为固定公式):

double fare = context.callActivity(
  CalculateFareActivity.class.getName(), request, options, double.class)
    .await();

然后通知乘客(实际项目中可集成短信/推送服务):

NotificationInput notificationInput = new NotificationInput(request, fare);
String notification = context.callActivity(
  NotifyPassengerActivity.class.getName(), notificationInput, options, String.class)
    .await();

再等待乘客确认——超时 5 分钟未确认就自动取消:

String confirmation = context.waitForExternalEvent(
  "passenger-confirmation", Duration.ofMinutes(5), String.class)
    .await();
if (!
"confirmed".equalsIgnoreCase(confirmation)) {
    context.complete(new RideWorkflowStatus(
      request.getRideId(), 
     
"CANCELLED"
     
"Passenger did not confirm the ride within the timeout period"));
    return;
}

最后,一切顺利,标记为“已完成”:

RideWorkflowStatus status = new RideWorkflowStatus(
  request.getRideId(), "COMPLETED", message);
context.complete(status);

整个流程清晰如教科书,却具备生产级的容错能力。

测试不再头疼:用 Testcontainers 模拟真实工作流

很多团队不敢用工作流,就是因为“怎么测”?Dapr 官方提供了 dapr-spring-boot-starter-test 模块,结合 Testcontainers,让你在 CI/CD 中跑完整流程测试。比如“快乐路径”测试:

@Test
void whenWorkflowStartedViaRest_thenAllActivitiesExecute() {
    // 发起工作流
    RideWorkflowRequest response = given().body(workflowRequest).post(
"/workflow/start-ride").as(...);
    String instanceId = response.getWorkflowInstanceId();
    
   
// 等待工作流运行中
    await().until(() -> workflowClient.getInstanceState(instanceId, false).getRuntimeStatus() == RUNNING);
    
   
// 触发乘客确认事件
    given().body(
"confirmed").post("/workflow/confirm/" + instanceId);
    
   
// 等待完成
    await().until(() -> workflowClient.getInstanceState(instanceId, false).getRuntimeStatus() == COMPLETED);
    
   
// 验证结果
    assertEquals(COMPLETED, finalStatus.getRuntimeStatus());
}

而“失败路径”测试更显价值:

@Test
void whenActivityFails_thenRetryPolicyApplies() {
    // 传入无效司机 ID
    RideWorkflowRequest invalidRequest = new RideWorkflowRequest(..., "", null);
    String instanceId = workflowClient.scheduleNewWorkflow(...);
    
    // 等待最终失败(重试 3 次后)
    await().until(() -> status.getRuntimeStatus() == FAILED);
    assertEquals(FAILED, finalStatus.getRuntimeStatus());
}

重试策略:不只是“再试一次”,而是智能退避

Dapr 的重试不是简单循环。你可以配置:

- 最大重试次数(maxRetries)
- 初始退避时间(backoffTimeout)
- 退避系数(backoffCoefficient):每次重试间隔 = 上次 × 系数
- 最大重试间隔(maxRetryInterval)
- 单次任务超时(maxTimeout)

private WorkflowTaskOptions taskOptions() {
    int maxRetries = 3;
    Duration backoffTimeout = Duration.ofSeconds(1);
    double backoffCoefficient = 1.5;
    Duration maxRetryInterval = Duration.ofSeconds(5);
    Duration maxTimeout = Duration.ofSeconds(10);
    WorkflowTaskRetryPolicy retryPolicy = new WorkflowTaskRetryPolicy(
      maxRetries, backoffTimeout, backoffCoefficient, maxRetryInterval, maxTimeout);
    return new WorkflowTaskOptions(retryPolicy);
}

这意味着短暂的网络抖动(如数据库连接超时)会被自动恢复,而持续性错误(如参数错误)则快速失败,避免资源浪费。

上云一步到位:用 Diagrid Catalyst 免运维部署

本地开发完,上线怎么办?自己搭 Dapr 集群?配状态存储?管工作流引擎?太重了!Diagrid Catalyst 是 Dapr 背后公司推出的全托管服务。只需一行 CLI 命令:

diagrid dev run \
  --project spring-boot \
  --app-id dapr-workflows \
  --app-port 60603 \
  -- mvn spring-boot:run

你的本地应用就自动连接到云端 Dapr 服务,享受生产级的高可用、持久化存储和监控。开发调试如本地,上线即生产——这才是现代云原生开发该有的体验。