Spring Cloud数据流中的组合函数支持


Spring Cloud Stream最近添加了一项Function,可将函数定义组合到现有的Spring Cloud Stream应用程序中。在本博客中,我们将看到Spring Cloud Data Flow如何利用此功能在Streaming管道中组合函数。

它有什么不同?
在Spring Cloud Data Flow中,流数据管道由Spring Cloud Stream应用程序组成,开发人员可以选择开箱即用的流应用程序,其中包含许多常见用例。开发人员还可以使用Spring Cloud Stream框架扩展这些开箱即用的应用程序或创建自定义应用程序。

Spring Cloud Stream 2.1.0 GA 已经集成了一个基于Spring Cloud Function-based编程模型,可以使用java.util.Function,一个java.util.Consumer,和一个java.util.Supplier表示业务逻辑,其相应的对应的角色是Spring Cloud Stream中的Processor,Sink和Source。
鉴于这种两者结合映射的灵活性,Spring Cloud Stream框架现在支持一种简单但功能强大的函数组合方法。这种函数组合可以是源Source和处理器Processor组合成一个单个应用程序:“新源Source”;或者,它可能是处理器Processor+接收器Sink组合到一个新的应用程序中:“新的Sink”。这种灵活性为流应用程序开发人员开辟了有趣的新方式。

让我们看看如何通过三个应用程序创建管道来执行简单转换,然后使用两个使用函数组合的应用程序来了解如何将其实现为管道。

Streaming Pipeline有三个应用程序
对于第一个流,
我们将使用开箱即用的http-source,transform-processor和log-sink的三个应用程序。
首先,启动Spring Cloud Data Flow local服务器:
java -jar spring-cloud-dataflow-server-local-1.7.3.RELEASE.jar

然后,启动Spring Cloud数据流shell:
java -jar spring-cloud-dataflow-shell-1.7.3.RELEASE.jar

现在让我们使用RabbitMQ绑定器(或Kafka绑定器)分别注册HTTP源source,变换器处理器processor和日志接收器sink 作为应用程序:

dataflow:>app register --name http --type source --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/http-source-rabbit/2.1.0.M2/http-source-rabbit-2.1.0.M2.jar

注册处理器:

dataflow:>app register --name transformer --type processor --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.M2/transform-processor-rabbit-2.1.0.M2.jar

注册接收器sink:

dataflow:>app register --name log --type sink --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/log-sink-rabbit/2.1.0.M2/log-sink-rabbit-2.1.0.M2.jar

现在我们可以创建一个没有函数组合的简单流:

dataflow:>stream create hello --definition "http --server.port=9000 | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log"

然后我们可以部署流:

dataflow:>stream deploy hello --properties "deployer.*.local.inheritLogging=true"


dataflow:>http post --data
"friend" --target "http://localhost:9000"
POST (text/plain) http:
//localhost:9000 friend
202 ACCEPTED

您可以在log应用程序中看到以下日志消息:

[sformer.hello-1] log-sink                                 : Hello FRIEND

在此流中,我们将http(Source),转换器(Processor)和日志(Sink)这个应用程序部署为目标平台中的独立的应用程序(在本例中,它是local)。对于某些用例,对于这种简单的有效负载转换逻辑,我们可能希望将Processor应用程序与Source或Sink应用程序结合使用。例如,在源输出数据中屏蔽某些特定用户特定字段的转换方案不一定需要部署为单独的独立应用程序。相反,它可以在Source或Sink应用程序中组合。

为了将Processor函数组合到源或接收器应用程序中,我们使用Spring Cloud Stream的函数组合支持。

Cloud Stream的函数组合
在Spring Cloud Stream中的函数组合的支持是基于Spring Cloud 函数的,让java.util.Supplier,java.util.Consumer以及java.util.Function注册作为春季@Bean的定义。这些函数@Bean定义可在运行时用于组合。
Spring Cloud Stream引入了一个名为的新属性,spring.cloud.stream.function.definition它对应于Spring Cloud Function中的函数定义DSL。设置此属性后,在运行时将自动链接所需的函数@bean。

函数组合以下列方式发生:

  • 当Spring Cloud Stream应用程序是Source类型时,在源Source之后作为output应用组合函数。
  • 当Spring Cloud Stream应用程序是Sink类型时,组合函数应用在接收器sink之前作为input。

这使得能够将函数(在Spring Cloud Function DSL中定义)组合到现有的Spring Cloud Stream应用程序中,然后由Spring Cloud Data Flow在流数据管道中进行编排。

函数组合案例
让我们创建并部署一个流,该流将前一个示例的变换器表达式组合进入Source应用程序本身。变换器逻辑通过使用两个java.util.Function实现来完成。
我们将创建一个新的源应用程序,我们将其称为http-transformer扩展开箱即用的http源应用程序。可以在此处找到新源应用程序的源代码。
该http-transformer应用程序包含upper和concat函数bean,定义如下:

@SpringBootApplication
@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class HttpSourceRabbitApplication {

    @Bean
    public Function<String, String> upper() {
        return value -> value.toUpperCase();
    }

    @Bean
    public Function<String, String> concat() {
        return value -> "Hello "+ value;
    }


    public static void main(String[] args) {
        SpringApplication.run(HttpSourceRabbitApplication.class, args);
    }
}

clone github repo后,您可以使用maven构建应用程序:
cd function-composition/http-transformer
 ./mvnw clean package

现在http-transformer使用Data Flow Shell 注册应用程序。

注意:对于以下app注册--uri选项,请使用适合您系统的值替换工件的目录名称和路径。

dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar

现在让我们创建一个流:

dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log"

在部署流时,我们传递spring.cloud.stream.function.definition属性以定义组合函数DSL(在Spring Cloud Function中定义)。在这种情况下,它是:

dataflow:>stream deploy helloComposed --properties "app.http-transformer.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"

上面的部署将upper和concat函数bean组合到http源应用程序中。

然后我们可以将负载(测试内容)发送到http应用程序:

dataflow:>http post --data "friend" --target "http://localhost:9001"
> POST (text/plain) http:
//localhost:9001 friend
> 202 ACCEPTED

然后你可以在log应用程序中看到输出:

[helloComposed-1] log-sink                                 : Hello FRIEND


请注意,函数组合支持不适用于开箱即用的Spring Cloud Stream Processor应用程序,因为在现有处理器的应用程序逻辑之前或之后是否需要应用该功能存在不确定性。
但是,您可以使用标准java.util.Function API创建自己的处理器应用程序,使用函数组合,如以下示例所示:

@Configuration
public static class FunctionProcessorConfiguration {

@Bean
public Function<String, String> upperAndConcat() {
return upper().andThen(concat());
}

  @Bean
  public Function<String, String> upper() {
     return value -> value.toUpperCase();
  }

  @Bean
  public Function<String, String> concat() {
     return value -> "Hello "+ value;
  }
}

然后,您需要使用以下属性进行部署: 
spring.cloud.stream.function.definition=upperAndConcat

Kotlin支持
另一个有趣的特性是Spring Cloud Function支持Kotlin函数的功能组合。这允许我们将任何Kotlin函数bean添加到组合函数Source或Sink应用程序中。
要看到这个工作,让我们使用http-transformer-kotlin-processor我们的示例github 存储库中的应用程序。
Kotlin函数bean配置为处理器。这里,Kotlin函数bean是transform如下定义的函数:

@Bean
open fun transform(): (String) -> String {
   return { "How are you ".plus(it) }
}

此外,该项目还具有spring-cloud-function-kotlin依赖性,可以对Kotlin函数应用函数配置支持,定义如下:

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-function-kotlin</artifactId>
      <version>2.0.0.RELEASE</version>
    </dependency>

cd function-composition/http-transformer-kotlin 
./mvnw clean package

对于以下app注册--uri选项,请使用适合您系统的值替换工件的目录名称和路径:

dataflow:>app register --name http-transformer-kotlin --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer-kotlin/target/http-transformer-kotlin-2.1.0.BUILD-SNAPSHOT.jar

要使用此应用程序创建流,请执行以下操作Source:

dataflow:>stream create helloComposedKotlin --definition "http-transformer-kotlin --server.port=9002 | log"

正如我们在http-transformer示例中所做的那样,我们可以使用该spring.cloud.stream.function.definition属性来指定任何有效的组合函数DSL来构造函数组合。在这种情况下,让我们将通过Java配置注册的函数bean与来自Kotlin处理器配置的函数bean结合起来。

dataflow:>stream deploy helloComposedKotlin --properties "app.http-transformer-kotlin.spring.cloud.stream.function.definition=upper|transform|concat,deployer.*.local.inheritLogging=true"

这里,函数名transform对应于Kotlin函数。
注意:我们可以在Kotlin函数和Java函数之间执行组合,因为Kotlin函数在内部转换为java.util.Function。

dataflow:>http post --data "friend" --target "http://localhost:9002"
> POST (text/plain) http:
//localhost:9002 friend
> 202 ACCEPTED

并且,您可以在log应用程序中看到输出为:
[omposedKotlin-1] log-sink : Hello How are you FRIEND

在此示例中,http-transformer还包含函数的源代码。但是,您可以通过在单独的工件中定义函数bean来使应用程序更加模块化。然后,您可以通过仅向项目添加maven依赖项并设置spring.cloud.stream.function.definition属性来构建应用程序。通过这种方式,您可以将大部分业务逻辑编码为函数,并且如果需要,可以使用Source或Sink组合它。