Spring Cloud Stream如何深度支持Apache Kafka?


Spring Cloud Stream提供了一种编程模型,可以立即连接到Apache Kafka。应用程序需要在其类路径中包含Kafka绑定器并添加一个名为的注释@EnableBinding,该注释将Kafka主题绑定到其输入或输出(或两者)。
Spring Cloud Stream提供了三个方便的接口来绑定@EnableBinding:( Source单输出),Sink(单输入)和Processor(单输入和输出)。它可以扩展到具有多个输入和输出的自定义接口。
以下代码片段显示了Spring Cloud Stream的基本编程模型:

@SpringBootApplication
@EnableBinding(Processor.class)
public class UppercaseProcessor {

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public String process(String s) {
     return s.toUpperCase();
  }
}

请注意该方法已注释@StreamListener,Spring Cloud Stream提供该方法以接收来自Kafka主题的消息。同样的方法也用注释SendTo,这是一个方便的注释,用于将消息发送到输出目的地。这是一个Spring Cloud Stream Processor应用程序,它使用来自输入的消息并向输出生成消息。

前面的代码中没有提到Kafka主题Topic。此时可能出现的一个自然问题是,“此应用程序如何与Kafka通信?”答案是:使用Spring Boot支持的众多配置选项之一配置入站和出站主题。在这种情况下,我们使用名为YAML的配置文件application.yml,默认情况下会搜索该文件。以下是输入和输出目标的配置:

spring.cloud.stream.bindings:
  input:
    destination: topic1
  output:
    destination: topic2


Spring Cloud Stream将输入input映射到输出topic1和输出topic2。这是一组非常小的配置,但有更多选项可用于进一步自定义应用程序。默认情况下,主题是使用单个分区创建的,但可以由应用程序覆盖。有关更多信息,请参阅这些文档
最重要的是,开发人员可以专注于编写核心业务逻辑,并让Spring Cloud Stream和Spring Boot处理基础架构问题(例如连接到Kafka,配置和调整应用程序等)。
以下示例显示了另一个简单的应用程序(消费者):

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {

  @StreamListener(Sink.INPUT)
  public void handle(Person person) {
     System.out.println("Received: " + person);
  }

  public static class Person {
     private String name;
     public String getName() {
        return name;
     }
     public void setName(String name) {
        this.name = name;
     }
     public String toString() {
        return this.name;
     }
  }
}

请注意,@EnableBinding带有一个Sink.class,表示这是一个消费者。与之前的应用程序的一个主要区别在于,注释的方法@StreamListener是将称为Person的POJO作为其参数而不是字符串。将来自Kafka主题的消息转换为此POJO!Spring Cloud Stream提供自动内容类型转换。默认情况下,它application/JSON用作内容类型,但也支持其他内容类型。您可以使用该属性提供内容类型spring.cloud.stream.bindings.input.contentType,然后将其设置为适当的内容类型,例如application/Avro。
Spring Cloud Stream根据此配置选择适当的消息转换器。如果应用程序想要使用Kafka提供的本机序列化和反序列化而不是使用Spring Cloud Stream提供的消息转换器,则可以设置以下属性。
对于序列化:
spring.cloud.stream.bindings.output.useNativeEncoding=true 

对于反序列化:
spring.cloud.stream.bindings.input.useNativeDecoding=true


自动配置主题
Apache Kafka绑定器提供了一个配置器provisioner ,用于在启动时配置主题。如果在代理上启用了主题创建,则Spring Cloud Stream应用程序可以作为应用程序启动的一部分来创建和配置Kafka主题。
例如,可以向分配器提供分区和其他主题级配置。这些自定义可以在 binder完成,适用于应用程序中使用的所有主题,或者适用于单个生产者和消费者级别。这在应用程序的开发和测试期间尤其方便。有关如何为多个分区配置主题的各种示例

支持消费组和分区
消费者组和分区等众所周知的属性可通过Spring Cloud Stream进行配置。可以通过属性设置消费者组:

spring.cloud.stream.bindings.input.group=group-name

如前所述,在内部,该小组将被翻译成Kafka的消费者群体。在编写生产者应用程序时,Spring Cloud Stream提供了将数据发送到特定分区的选项。在内部,框架再次将这些职责委托给Kafka。
如果禁用消费者组的自动重新平衡,则可以限制特定应用程序实例使用来自某组分区的消息,这是一个需要覆盖的简单配置属性。有关详细信息,请参阅这些配置选项

绑定可视化和控制
使用Spring Boot的执行器机制,我们现在可以控制 Spring Cloud Stream中的各个绑定
在运行时,可以使用执行器端点停止,暂停,恢复应用程序,这是Spring Boot的机制,用于在将应用程序推送到生产环境时监视和管理应用程序。此功能使用户可以对应用程序处理Kafka数据的方式进行更多控制。如果应用程序暂停绑定,则处理来自该特定主题的记录将被暂停,直到恢复为止。
Spring Cloud Stream还与Micrometer集成,可实现更丰富的指标,排放混乱率并提供其他与监控相关的功能。这些可以与许多其他监控系统进一步集成。Kafka活页夹提供了扩展的指标功能,可以提供有关主题的消费者滞后的其他见解。
Spring Boot 通过特殊的健康端点提供应用程序运行状况检查。Kafka绑定器提供了一个特殊的健康指示器实现,它考虑了与代理的连接,并检查所有分区是否健康。如果在没有领导者的情况下找到任何分区,或者无法连接代理,则运行状况检查会相应地报告状态。

下面见Spring Cloud Stream对Kafka Streams的深度支持,可点击Kafka标签进入查看