SEDA,或阶段化事件驱动架构,是Matt Welsh在他的博士论文中提出的一种架构风格。论文。它的主要优点是可伸缩性、支持高并发流量和可维护性。
由于可伸缩性是SEDA的核心目标,因此通常最好设计专注于特定操作的小阶段,特别是当我们有I/O密集型任务时。而且,拥有小的阶段有助于我们更好地调整每个阶段的规模。
我们的示例问题将是简单明了的:计算每个单词在给定字符串中不区分大小写出现的次数。
让我们把一个单词定义为一系列没有空格的字符,我们将忽略标点符号等其他复杂因素。我们的输出将是一个映射,其中包含作为键的单词和作为值的计数。
为了解决字数统计问题,我们可以通过以下几个阶段来实现解决方案:
现在我们已经有了阶段设计,让我们在下一节中使用两种不同的企业集成技术来实现它。在此表中,我们可以预览SEDA将如何在我们的实现中显示:
使用Spring集成的解决方案
对于我们的第一个实现,我们将使用Spring Integration。Spring Integration构建在Spring模型之上,以支持流行的企业集成模式。
Spring Integration有三个主要组件:
- 消息是包含报头和正文的数据结构。
- 通道将消息从一个终结点传送到另一个终结点。 Spring Integration中有两种通道:
- 点对点:只有一个终结点可以使用此通道中的消息。
- 发布-订阅:多个终结点可以使用此通道中的消息。
终结点将消息路由到执行某些业务逻辑的应用程序组件。 SpringIntegration中有各种各样的端点,比如转换器、路由器、服务激活器和过滤器。
1、准备阶段
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-test</artifactId> <scope>test</scope> </dependency> </dependencies>
|
消息传递网关是一个代理,它隐藏了向集成流发送消息的复杂性。让我们为Spring集成流程设置一个:
@MessagingGateway public interface IncomingGateway { @Gateway(requestChannel = "receiveTextChannel", replyChannel = "returnResponseChannel") public Map<String, Long> countWords(String input); }
|
在SEDA下,通道需要通过关联的线程池进行扩展,因此让我们从创建线程池开始:
@Bean("receiveTextChannelThreadPool") TaskExecutor receiveTextChannelThreadPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(1); executor.setMaxPoolSize(5); executor.setThreadNamePrefix("receive-text-channel-thread-pool"); executor.initialize(); return executor; }
|
接下来,我们将使用线程池创建通道:
@Bean(name = "receiveTextChannel") MessageChannel getReceiveTextChannel() { return MessageChannels.executor("receive-text", receiveTextChannelThreadPool) .get(); }
|
MessageChannels是一个Spring Integration类,它帮助我们创建各种类型的通道。这里,我们使用executor()方法创建一个 ExecutorChannel,它是一个由线程池管理的通道。
2、SEDA接受文件阶段
我们的渠道建立后,我们就可以开始实施我们的阶段了。让我们创建初始阶段:
@Bean IntegrationFlow receiveText() { return IntegrationFlows.from(receiveTextChannel) .channel(splitWordsChannel) .get(); }
|
IntegrationFlows是一个流畅的Spring集成API,用于创建 IntegrationFlow对象,表示我们的流的各个阶段。
3、SEDA拆分单词阶段
我们的下一阶段只有一项责任:将我们的输入字符串拆分成一个 字符串数组,其中包含句子中的各个单词:
@Bean IntegrationFlow splitWords() { return IntegrationFlows.from(splitWordsChannel) .transform(splitWordsFunction) .channel(toLowerCaseChannel) .get(); }
|
4、SEDA转为小写阶段
@Bean IntegrationFlow toLowerCase() { return IntegrationFlows.from(toLowerCaseChannel) .split() .transform(toLowerCase) .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(listSizeReached) .outputProcessor(buildMessageWithListPayload)) .channel(countWordsChannel) .get(); }
|
我们在这里使用的第一个新的 IntegrationFlows 方法是 split()。split()方法使用splitter模式,将我们的输入消息的每个元素作为单独的消息发送到toLowerCase。我们看到的下一个新方法是aggregator(),它实现了聚合器模式。聚合器模式有两个基本参数。
- 发布策略,它决定何时将消息合并成一个单一的消息
- 处理器,它决定了如何将消息合并成一个单一的消息。
我们的发布策略函数使用listSizeReached,它告诉聚合器在输入数组的所有元素都被收集后开始聚合。5、计词阶段
我们的最后阶段将我们的单词计数打包成一个Map,其中键是原始输入的单词,而值是每个单词的出现次数。
@Bean IntegrationFlow countWords() { return IntegrationFlows.from(countWordsChannel) .transform(convertArrayListToCountMap) .channel(returnResponseChannel) .get(); }
|
使用Apache Camel的解决方案
Apache Camel是一个流行而强大的开源集成框架。它基于四个主要概念。
- Camel上下文。Camel运行时将不同的部分粘在一起。
- 路由。路由决定了一个消息应该如何被处理,以及它接下来应该去哪里。
- 处理程序。这些是各种企业集成模式的即用型实现。
- 组件。组件是通过JMS、HTTP、文件IO等集成外部系统的扩展点。
详细点击标题
这些示例的源代码可在 GitHub 上获得。