基于Spring Integration和Apache Camel的SEDA


SEDA,或阶段化事件驱动架构,是Matt Welsh在他的博士论文中提出的一种架构风格。论文。它的主要优点是可伸缩性、支持高并发流量和可维护性。

由于可伸缩性是SEDA的核心目标,因此通常最好设计专注于特定操作的小阶段,特别是当我们有I/O密集型任务时。而且,拥有小的阶段有助于我们更好地调整每个阶段的规模。

我们的示例问题将是简单明了的:计算每个单词在给定字符串中不区分大小写出现的次数。
让我们把一个单词定义为一系列没有空格的字符,我们将忽略标点符号等其他复杂因素。我们的输出将是一个映射,其中包含作为键的单词和作为值的计数。


为了解决字数统计问题,我们可以通过以下几个阶段来实现解决方案:

  • 接受文本
  • 分割词语
  • 转换小写
  • 单词计数
  • 返回

现在我们已经有了阶段设计,让我们在下一节中使用两种不同的企业集成技术来实现它。在此表中,我们可以预览SEDA将如何在我们的实现中显示:

使用Spring集成的解决方案
对于我们的第一个实现,我们将使用Spring Integration。Spring Integration构建在Spring模型之上,以支持流行的企业集成模式。
Spring Integration有三个主要组件:

  1. 消息是包含报头和正文的数据结构。
  2. 通道将消息从一个终结点传送到另一个终结点。 Spring Integration中有两种通道:
    • 点对点:只有一个终结点可以使用此通道中的消息。
    • 发布-订阅:多个终结点可以使用此通道中的消息。
  • 终结点将消息路由到执行某些业务逻辑的应用程序组件。 SpringIntegration中有各种各样的端点,比如转换器、路由器、服务激活器和过滤器。

    1、准备阶段

    <dependencies>
        <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        </dependency>
        <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-test</artifactId>
        <scope>test</scope>
        </dependency>
    </dependencies>

    消息传递网关是一个代理,它隐藏了向集成流发送消息的复杂性。让我们为Spring集成流程设置一个:

    @MessagingGateway
    public interface IncomingGateway {
        @Gateway(requestChannel = "receiveTextChannel", replyChannel = "returnResponseChannel")
        public Map<String, Long> countWords(String input);
    }

    在SEDA下,通道需要通过关联的线程池进行扩展,因此让我们从创建线程池开始:

    @Bean("receiveTextChannelThreadPool")
    TaskExecutor receiveTextChannelThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(5);
        executor.setThreadNamePrefix("receive-text-channel-thread-pool");
        executor.initialize();
        return executor;
    }

    接下来,我们将使用线程池创建通道:

    @Bean(name = "receiveTextChannel")
    MessageChannel getReceiveTextChannel() {
        return MessageChannels.executor("receive-text", receiveTextChannelThreadPool)
          .get();
    }

    MessageChannels是一个Spring Integration类,它帮助我们创建各种类型的通道。这里,我们使用executor()方法创建一个 ExecutorChannel,它是一个由线程池管理的通道。

    2、SEDA接受文件阶段

    我们的渠道建立后,我们就可以开始实施我们的阶段了。让我们创建初始阶段:

    @Bean
    IntegrationFlow receiveText() {
        return IntegrationFlows.from(receiveTextChannel)
          .channel(splitWordsChannel)
          .get();
    }

    IntegrationFlows是一个流畅的Spring集成API,用于创建 IntegrationFlow对象,表示我们的流的各个阶段。


    3、SEDA拆分单词阶段
    我们的下一阶段只有一项责任:将我们的输入字符串拆分成一个 字符串数组,其中包含句子中的各个单词:

    @Bean
    IntegrationFlow splitWords() {
        return IntegrationFlows.from(splitWordsChannel)
          .transform(splitWordsFunction)
          .channel(toLowerCaseChannel)
          .get();
    }

    4、SEDA转为小写阶段

    @Bean
    IntegrationFlow toLowerCase() {
        return IntegrationFlows.from(toLowerCaseChannel)
          .split()
          .transform(toLowerCase)
          .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(listSizeReached)
            .outputProcessor(buildMessageWithListPayload))
          .channel(countWordsChannel)
          .get();
    }

    我们在这里使用的第一个新的 IntegrationFlows 方法是 split()。split()方法使用splitter模式,将我们的输入消息的每个元素作为单独的消息发送到toLowerCase。

    我们看到的下一个新方法是aggregator(),它实现了聚合器模式。聚合器模式有两个基本参数。

    • 发布策略,它决定何时将消息合并成一个单一的消息
    • 处理器,它决定了如何将消息合并成一个单一的消息。

    我们的发布策略函数使用listSizeReached,它告诉聚合器在输入数组的所有元素都被收集后开始聚合。

    5、计词阶段
    我们的最后阶段将我们的单词计数打包成一个Map,其中键是原始输入的单词,而值是每个单词的出现次数。

    @Bean
    IntegrationFlow countWords() {
        return IntegrationFlows.from(countWordsChannel)
          .transform(convertArrayListToCountMap)
          .channel(returnResponseChannel)
          .get();
    }

    使用Apache Camel的解决方案
    Apache Camel是一个流行而强大的开源集成框架。它基于四个主要概念。

    • Camel上下文。Camel运行时将不同的部分粘在一起。
    • 路由。路由决定了一个消息应该如何被处理,以及它接下来应该去哪里。
    • 处理程序。这些是各种企业集成模式的即用型实现。
    • 组件。组件是通过JMS、HTTP、文件IO等集成外部系统的扩展点。

    详细点击标题

    这些示例的源代码可在 GitHub 上获得。