使用Activiti和Spring集成实现BPEL和BPMN

  BPEL流程自动管理和BPM人工工作流是两种流程,前者主要将现有的服务按照流程定义规则进行调度组合协同,是纯粹的机器之间的协同工作,而BPM代表的工作流是有人工参与的协同工作。

BPEL是一种基于XML的流程规范语言,主要关注自动化;BPMN 最初是一个纯粹的图形化业务流程表示法,是BPM的符号表示法。BPMN和BPEL经常结合使用BPMN用于以业务用户为中心的视角,BPEL用于机器之间的技术规范。

到了BPMN版本2.0,BPMN标准中添加了自己的XML格式。因此,BPEL在BPMN环境中变得不那么重要了,因为BPMN现在满足了大部分业务和IT需求,目前,BPMN 2.0 XML格式可以说是在系统之间传输过程模型的最流行的标准,包括了BPEL的一部分。

Activiti是一个流程引擎核心,能够接受流程定义,运行时记录流程状态,可以作为整个流程的记录器,或状态机,Spring-Intergration是Spring的集成框架,能够实现服务之间的集成调度,这两者结合可以实现流程的人机交互协同工作。

本文翻译自Spring官方文档

什么是Activiti

    Activiti是一个业务流程引擎。它基本上是一个有节点(状态)的有向图,用于模拟复杂业务流程的状态。它跟踪业务流程中描述的工作进度。它描述了系统中的自动和基于人的角色。它还支持询问业务流程引擎,询问有关正在进行的流程实例的问题:有多少存在,哪些存在停滞等。业务流程管理系统(BPMS)提供了许多优势,其中一些优点是:

  • 协作过程,人类和服务被用于推动更大的业务需求(想象贷款批准,法律合规,文档修订等)
  • 它们支持组织中重要业务流程的审计和日志记录。这在监管环境中非常宝贵。
  • BPM引擎旨在处理长时间运行的进程状态,这意味着您的域模型不再需要充满特定于进程状态的特定字段,如is_enrolled或reset_password_date。
  • 易于建模的复杂业务流程

最后一点值得关注:像Activiti这样的优秀BPM引擎支持可视化建模业务流程。UML支持使用活动(步骤)和泳道(参与满足这些步骤所涉及的代理)直观地描述流程。当然,UML只是一种建模工具。它没有运行时语义。业务流程管理的圣杯一直是有一个可供业务分析师应用程序开发人员使用的建模符号。BPMN 2就像我们实现这一目标一样接近。

例如,这是一个非常简单的业务流程的可视化模型。

这是为支持该模型而创建的标准XML标记。这个XML具有执行语义,而不仅仅是建模语义。

    <?xml version="1.0" encoding="UTF-8"?>
<definitions id="definitions"
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:activiti="http://activiti.org/bpmn"
typeLanguage="http://www.w3.org/2001/XMLSchema"
expressionLanguage="http://www.w3.org/1999/XPath"
targetNamespace="http://www.activiti.org/bpmn2.0">

<process id="asyncProcess">

<startEvent id="start"/>

<sequenceFlow
id="flow1"
sourceRef="start"
targetRef="sigw"/>

<serviceTask
id="sigw"
name="Delegate to Spring Integration"
activiti:delegateExpression="#{gateway}"/>

<sequenceFlow
id="scriptFlow"
sourceRef="sigw"
targetRef="confirmMovementTask"/>

<scriptTask
id="confirmMovementTask"
name="Execute script" scriptFormat="groovy">
<script>
println 'Moving on..'
</script>
</scriptTask>

<sequenceFlow
id="flow2"
sourceRef="confirmMovementTask"
targetRef="theEnd"/>

<endEvent id="theEnd"/>

</process>

</definitions>

 

大多数分析师不会手工编写这个XML。相反,他们将使用像Alfresco的Activiti Cloud这样的建模工具。然而,XML工件是可循环访问的:它可以由开发人员注释,然后在建模工具中进行修改。

但是,在检查时,您会发现大部分内容并不复杂。该过程有四种状态:

  • startEvent - 流程开始的地方
  • serviceTask- 调用Spring bean调用gateway以在Spring Integration中启动一些处理(稍后会详细介绍!)
  • scriptTask - 使用Groovy打印出一个简单的确认消息
  • endEvent - 完成

订单由sequenceFlow连接节点的各种元素明确说明。

Activiti是跟踪业务流程状态的好方法,但它不是一个特别强大的组件模型或集成技术。为此,我们需要一个像Spring Integration这样的集成框架。

什么是Spring Integration

    Spring Integration支持跨多个不兼容的系统集成服务和数据。从概念上讲,编写集成流类似于在UNIX OS上使用stdin和组合管道和过滤器流stdout:

cat input.txt |  grep ERROR | wc -l > output.txt

在该示例中,我们从源(文件input.txt)获取数据,将其传递给grep命令以过滤结果并仅保留包含令牌的行,ERROR然后将其传送到wc实用程序,我们将计算它有多少行。最后,最终计数被写入输出文件,output.txt。这些组件 - cat,grep和wc- 彼此都不知道。它们并不是为了彼此而设计的。相反,他们只知道如何阅读stdin和写作stdout。这种数据标准化使得从简单原子组成复杂解决方案变得非常容易。在该示例中,cat文件的行为将数据转换为任何stdin可识别的进程可读取的数据。它适应入站数据为规范化格式,字符串行。最后,redirect(>)操作符将规范化数据(字符串行)转换为文件系统上的数据。它适应它。pipe(|)字符用于表示一个组件的输出应该流向另一个组件的输入。

Spring Integration流程的工作方式相同:数据被规范化为Message<T>实例。每个Message<T>都有一个有效负载和头 - 关于有效负载的元数据Map<K,V>- 它们是不同消息传递组件的输入和输出。这些消息传递组件通常由Spring Integration提供,但您可以轻松编写和使用自己的组件。各种消息传递组件都支持所有企业应用程序集成模式(过滤器,路由器,变换器,适配器,网关等)。Spring框架MessageChannel是一个命名管道,通过它Message<T>在消息传递组件之间流动。它们是管道,默认情况下,它们的工作方式类似于java.util.Queue。数据输入,数据输出。

Spring集成入站适配器来自外部系统(如RabbitMQ,Apache Kafka和JMS,电子邮件,Twitter,文件系统挂载,物联网设备和其他许多系统的消息队列)的数据调整为Message<T>s。Spring Integration出站适配器以相反的方式执行相同的操作,Message<T>并将其写入外部系统(例如,作为电子邮件,Tweet或Apache Kafka消息)。

Spring Integration支持事件驱动的体系结构,因为它可以帮助检测并响应外部世界中的事件。例如,您可以使用Spring Integration每10秒轮询一次文件系统,并Message<T>在出现新文件时发布。您可以使用Spring Integration充当传递给Apache Kafka主题的消息的侦听器。适配器处理响应外部事件并使您免于担心发起消息,并让您在消息到达后专注于处理消息。它是依赖注入的集成等价物!

依赖注入使组件代码不再担心资源初始化和获取,并使其可以专注于编写具有这些依赖关系的代码。该javax.sql.DataSource领域来自哪里?谁在乎!Spring将它连接起来,它可能是从测试中的Mock,经典应用程序服务器中的JNDI或配置的Spring Boot bean获得的。组件代码仍然不知道这些细节。大约15年前,当我们第一次开始谈论依赖注入时,我们会谈到“好莱坞校长:”“不要打电话给我,我会打电话给你!”这更适用于Spring Integration!

入站网关接收来自外部系统的传入请求,将其作为Message<T>s处理,并发送回复。出站网关采用Message<T>s,将它们转发到外部系统,并等待来自该系统的响应。它们支持请求和回复交互。

Activiti和Spring集成网关

Activiti可用于描述记录,可审计和可预测状态的复杂,长期运行的过程,Spring Integration可用于集成!Spring Integration是我们保存有趣Java代码的地方,Activiti会跟踪总体状态。这个技巧在20年前很有用,今天它在大规模分布的微服务世界中也很有用,其中单个请求的处理可能跨越多个服务和节点。那么,Spring Boot,Spring Integration和Activiti可以很好地协同工作

一个常见的用例是使用Activiti启动BPM流程,然后在进入等待状态时,将该状态的处理委托给Spring Integration,当然,这可以将工作分散到其他系统。这是一个说明流程的简单图表。

BPM流程状态通常可以涉及人工代理。例如,工作流引擎可能具有将文档发送给人员以供批准的状态,但是审阅者正在度假并且将不会回来数周。保持线程开放是一种浪费,更不用说危险了,期望继续处理所需的任何确认都将在几毫秒或几秒内恢复。

Activiti足够聪明,可以暂停执行,在等待状态期间将执行状态持久保存到数据库,并且仅在发出流程执行信号后才恢复。发信号通知流程引擎可重新水化流程并恢复执行。一个简单的示例可能是新的用户注册业务流程,该流程委托Spring Integration发送带有注册确认链接的电子邮件。用户可能需要数天才能单击确认链接,但在单击时,系统应继续进行注册业务流程。

在这篇文章中,我们将讨论如何启动进入等待状态的BPM流程,然后委托Spring Integration进行某种自定义处理,然后在遥远的未来,继续执行流程。

我们将设置两个Spring Integration流程:一个用于处理来自Activiti的请求到Spring Integration,另一个用于处理最终的回复并触发恢复流程。

我们需要一些东西来启动我们的流程,所以这里是一个简单的REST端点(http://localhost:8080/start),每次启动一个新流程:

@RestController
class ProcessStartingRestController {

@Autowired
private ProcessEngine processEngine;

@RequestMapping(method = RequestMethod.GET, value = "/start")
Map<String, String> launch() {
ProcessInstance asyncProcess = this.processEngine.getRuntimeService()
.startProcessInstanceByKey("asyncProcess");
return Collections.singletonMap("executionId", asyncProcess.getId());
}
}

 

消息将跨越MesssageChannel我们将在@Configuration类中创建的两个通道:requests和replies。

@Configuration
class MessageChannels {

@Bean
DirectChannel requests() {
return new DirectChannel();
}

@Bean
DirectChannel replies() {
return new DirectChannel();
}
}

这两个通道类似一种队列通道,可以使用消息系统的队列实现,request是用来实现前面图中Activiti到Spring集成的请求,Activit可以通过这个通道发送需要Spring干的事情,而replies则是Spring干完事情的反馈结果。

使用@Configuration类的好处是它本身就是一个Spring组件,可以注入到任何地方。我们可以通过@Bean直接调用提供者的方法来取消引用通道。另一种方法是@Qualifier每次我们为其中一个通道注入引用时使用,例如:

public static final String REPLIES_CHANNEL_NAME = "replies";

@Autowired
@Qualifier(REPLIES_CHANNEL_NAME)
private MessageChannel replies;

 

我们的BPMN非常简单,但我们将使用一个特定于Activiti的命名空间属性activiti:delegateExpression="#{gateway}"来告诉Activiti需要调用一个名为gateway,它是在Spring中注册的bean 。Activiti知道这样做是因为这个应用程序是使用Spring引导的自动配置,其中包括将Spring管理的bean暴露给Activiti表达式语言。这个gateway是一种基于Activiti的bean类型ReceiveTaskActivityBehavior。Spring Boot具有Spring Integration和Activiti的自动配置,因此90%的繁琐设置都会消失。

让我们看一下我们的简单gateway组件,一个Activiti ActivityBehavior接口的实现,它充当回调函数,我们可以用其发送Message<T>到requests通道并启动Spring Integration流程。这里重要的是我们已经捕获了executionId,我们稍后需要恢复发出信号的过程。这里等于输入stdin;

@Bean
ActivityBehavior gateway(MessageChannels channels) {
return new ReceiveTaskActivityBehavior() {

@Override
public void execute(ActivityExecution execution) throws Exception {

Message<?> executionMessage = MessageBuilder
.withPayload(execution)
.setHeader("executionId", execution.getId())
.build();

channels.requests().send(executionMessage);
}
};
}

 

该Message<T>会触发通道requests MessageChannel另一边的处理。在一个复杂的例子中,将请求转换为有意义的消息,例如,将其转发到其他系统(如电子邮件)是小case。在这里,我们只打印出标题,以便我们可以记录executionId并稍后使用它。这里等于输出stdout;这里是请求通道,Activiti将命令Spring干的事情通过这个通道发给Spring,下面代码是Spring接受指令后干的事情,只是打印而已。

@Bean
IntegrationFlow requestsFlow(MessageChannels channels) {
return IntegrationFlows.from(channels.requests())
.handle(msg -> msg.getHeaders().entrySet()
.forEach(e -> log.info(e.getKey() + '=' + e.getValue())))
.get();
}

 

此时,工作流定义将保持不变,并且没有活动的流程实例。我们需要以某种方式异步地发出信号。我们实现一个REST端点来向Activiti发出信号:http://localhost:8080/resume/{executionId}。REST很容易理解,但实际上我们可以使用Spring Integration发送到任何外部系统中的一个事件来实现这种效果。唯一要确保的是,无论外部事件如何,我们都能以某种方式发送executionId,就像我们在这里通过在URI中捕获它一样。下面代码就是Spring向Activiti发出处理结果。

@RestController
class ProcessResumingRestController {

@Autowired
private MessageChannels messageChannels;

@RequestMapping(method = RequestMethod.GET, value = "/resume/{executionId}")
void resume(@PathVariable String executionId) {

Message<String> build = MessageBuilder.withPayload(executionId)
.setHeader("executionId", executionId)
.build();

this.messageChannels.replies().send(build);
}
}

 

当Message<T>流过replies MessageChannel,它会触发另一端处理,注意这里是响应通道,Spring干完事情后的结果通过这个通道返回给Activiti。在这里,我们将使用另一个Spring Integration流程来接收传入Message<T>并发出恢复流程的信号。执行此流程后,您将看到流程中的下一步scriptTask,评估和打印到控制台上的“Moving on!”字样。

@Bean
IntegrationFlow repliesFlow(MessageChannels channels,
ProcessEngine engine) {
return IntegrationFlows.from(channels.replies())
.handle(msg -> engine.getRuntimeService().signal(
String.class.cast(msg.getHeaders().get("executionId"))))
.get();
}

 

下一步

 

真正的强大功能是使用BPM来编排复杂的处理逻辑:想象一下在BPM流程中保存流程的状态,然后调用Spring Batch作业,或者RestTemplate在Spring Cloud中使用Ribbon负载平衡调用REST服务,或者将其转发Message<T>到Spring Cloud中数据流流程。Spring Cloud Data Flow是我最喜欢的数据处理方法之一,因为它建立在Spring Cloud Stream的基础之上,而Spring Cloud Stream又建立在Spring Integration上:它MessageChannel一直在最底层!

 

深入理解Activiti工作流

Spring Batch

Spring Cloud

Spring Boot

SOA

 

 

猜你喜欢