Spring Cloud Stream的函数式和响应式Reactive编程特点 - spring.io

19-10-26 banq

Spring Cloud Stream(SCSt)的函数式和反应式Reactive编程带来更少的代码、更少的配置。不过,最重要的是,您的代码是完全分离的,并且与SCSt的内部结构无关。

虽然下面描述的所有功能都是SCSt的依赖项Spring Cloud Function(SCF)的functional,但是在理解SCSt上下文中功能的附加含义时,您必须意识到某些细微差别。

供应商,函数和消费者

类型Supplier, Function, or Consumer的任何bean或可以映射到它们的任何Bean都可以被SCST视为一个消息处理程序(Function或 Consumer),或者被视为一个消息源(Supplier),根据使用的函数策略类型,使用<function-name>-<in/out>-<index>命名约定自动生成输入和输出绑定。

看下面案例:

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

这里的函数被视为一个消息处理程序,该消息处理程序消费来自绑定的uppercase-in-0,结果发送到绑定的uppercase-out-0。其余部分可以像以前一样使用。例如

--spring.cloud.stream.bindings.uppercase-in-0.content-type=text/plain。

单击此处了解更多详细信息和配置选项。

命令式或反应式

函数可以是命令式或反应性。命令式函数在每个单独的事件上触发,而反应式函数仅触发一次,将引用传递给Project Reactor提供的整个事件流抽象(例如Flux和Mono)。

命令式:

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

反应性:

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<Flux<String>, Flux<String>> uppercase() {
        return flux -> flux.map(value -> value.toUpperCase());
    }
}

除了为您提供不同的(单例)编程样式来处理事件之外,反应式编程还为某些用例增加了附加值,否则实现起来将非常复杂。尽管本文讨论这些用例或响应模式超出了讨论范围,但仍然值得一提的是状态管理用例(例如窗口,聚合和分组)以及拆分流或用例的情况。合并多个流,这将在下一节中讨论。

关于反应式功能的绑定和命名规则,它们与上一节中说明的相同。

注意:虽然前面的示例仅Function作为示例,但相同的规则适用于Supplier和Consumer。用户指南的Spring Cloud Function支持部分以及Reactor文档提供了更多详细信息。

函数性

有时需要对数据流进行分类和组织。例如,考虑一个经典的大数据用例,即处理包含“订单”和“发票”的无组织数据,并且您希望每个都进入一个单独的数据存储中。在这里发挥功能支持(具有多个输入和输出的功能)的支持。

让我们看一个这样的函数的示例(此处提供完整的实现细节):

@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
    return flux -> ...;
}

鉴于Project Reactor是SCF的核心依赖项,我们将使用其Tuple库。Tuple向我们传达基数cardinality和类型信息而给我们带来了独特的优势。两者对于SCSt而言都是极其重要的。基数让我们知道需要创建多少个输入和输出绑定并将其绑定到函数的相应输入和输出。知道类型信息可确保正确的类型转换。

同样,这是绑定名称命名约定的“索引”部分起作用的地方,因为在此函数中,两个输出绑定名称为organise-out-0和organise-out-1。

重要提示:目前,仅针对Function<TupleN<Flux<?>...>, TupleN<Flux<?>...>>以复杂事件处理为中心的反应性函数()支持functional统一性,在复杂事件处理中,对事件融合的评估和计算通常需要查看事件流,而不是单个事件。

有关最新信息,请阅读用户指南中的“ 具有多个输入和输出参数的函数”部分