Spring Cloud Stream对Kafka Streams的深度支持


在编写流处理应用程序时,Spring Cloud Stream提供了另一个专门用于Kafka Streams的绑定器。与常规Kafka绑定器一样,Kafka Streams绑定器也专注于开发人员的工作效率,因此开发人员可以专注于为KStream,KTable,GlobalKTable等编写业务逻辑,而不是基础架构代码。绑定器负责连接到Kafka,以及创建,配置和维护流和主题。例如,如果应用程序方法具有KStream签名,则活页夹将连接到目标主题并在后台从其中流式传输。应用程序开发人员不必明确地这样做,因为绑定程序已经为应用程序提供了它。
这同样适用于其他类型,如KTable和GlobalKTable。底层KafkaStreams对象由用于依赖注入的绑定器提供,因此应用程序不直接维护它。而是由Spring Cloud Stream为您完成的。
要使用Spring Cloud Stream开始使用Kafka Streams,请转到Spring Initializr并选择下图中显示的选项,以生成一个应用程序,其中包含使用Spring Cloud Stream编写Kafka Streams应用程序的依赖项.
下面的示例显示了使用Spring Cloud Stream编写的Kafka Streams应用程序:

@SpringBootApplication
public class KafkaStreamsTableJoin {

  @EnableBinding(StreamTableProcessor.class)
  public static class KStreamToTableJoinApplication {

     @StreamListener
     @SendTo("output")
     public KStream<String, Long> process(@Input(
"input1") KStream<String, Long> userClicksStream,
                                 @Input(
"input2") KTable<String, String> userRegionsTable) {

        return userClicksStream
              .leftJoin(userRegionsTable,
                    (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
              .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
              .groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
              .reduce((firstClicks, secondClicks) -> firstClicks + secondClicks)
              .toStream();
     }
  }

  interface StreamTableProcessor {

     @Input(
"input1")
     KStream inputStream();

     @Output(
"output")
     KStreamoutputStream();

     @Input(
"input2")
     KTable inputTable();
  }
}

在前面的代码中有一些注意事项。在该@StreamListener方法中,没有用于设置Kafka Streams组件的代码。应用程序不需要构建流拓扑来关联KStream或关联KTableKafka主题,启动和停止流等。所有这些机制都由用于Kafka StreamsSpring Cloud Stream绑定器处理。在调用方法时,已经创建了一个 KStream和一个KTable并且可供应用程序使用。
应用程序创建一个名为的自定义接口,StreamTableProcessor它指定用于输入和输出绑定的Kafka Streams类型。此接口用于@EnableBinding。与常规Kafka绑定器类似,Kafka上的目标也是由Spring Cloud Stream属性指定的。您可以为前面的应用程序提供这些配置选项,以创建必要的流和表:

spring.cloud.stream.bindings.input1.destination = userClicksTopic 
spring.cloud.stream.bindings.input2.destination = userRegionsTopic 
spring.cloud-stream.bindings.output.destination = userClickRegionsTopic

我们使用两个Kafka主题来创建传入流:一个用于消息消息KStream,另一个用于消息KTable。框架根据自定义界面中提供的绑定适当地使用所需类型StreamTableProcessor。然后,这些类型将与方法签名配对,以便在应用程序代码中使用。在出站时,传出KStream将发送到输出Kafka主题。

Kafka Streams中的可查询状态存储支持
Kafka Streams提供了用于编写有状态应用程序的第一类基元。当使用Spring Cloud Stream和Kafka Streams构建有状态应用程序时,可以使用RESTful应用程序从RocksDB中的持久状态存储中提取信息。请参阅下面的Spring REST应用程序示例,该应用程序依赖于Kafka Streams的状态存储:

@RestController
public class FooController {

  private final Log logger = LogFactory.getLog(getClass());

  @Autowired
  private InteractiveQueryService interactiveQueryService;

@RequestMapping("/song/id")
public SongBean song(@RequestParam(value=
"id") Long id) {

     final ReadOnlyKeyValueStore<Long, Song> songStore =
           interactiveQueryService.getQueryableStore(“STORE-NAME”,
 QueryableStoreTypes.<Long, Song>keyValueStore());

     final Song song = songStore.get(id);
     if (song == null) {
        throw new IllegalArgumentException(
"Song not found.");
     }
     return new SongBean(song.getArtist(), song.getAlbum(), song.getName());
  }
}

InteractiveQueryService是Apache Kafka Streams绑定器提供的API,应用程序可以使用它从状态存储中检索。应用程序可以使用此服务按名称查询状态存储,而不是通过底层流基础结构直接访问状态存储。当多个Kafka Streams应用程序实例运行时,此服务还提供用户友好的方式来访问服务器主机信息,并在其间分布分区。
通常在这种情况下,应用程序必须通过直接访问Kafka Streams API来找到托管key分区所在的主机。将InteractiveQueryService提供围绕这些API方法的包装。一旦应用程序获得对状态存储的访问权限,它就可以通过查询来制定进一步的计划方案。最终,这些方案可以通过REST端点提供,如上所示。您可以在使用Spring Cloud Stream编写的Kafka Streams应用程序的 GitHub上找到这个示例,其中它使用本节中提到的功能适应Kafka音乐示例

在Kafka Streams分支
通过使用SendTo注释,可以在Spring Cloud Stream中本地使用Kafka Streams的分支功能。

@StreamListener("input")
@SendTo({“englishTopic”, “frenchTopic”, “spanishTopic”})
public KStream<?, WordCount>[] process(KStream<Object, String> input) {

  Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals(
"english");
  Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals(
"french");
  Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals(
"spanish");

  return input
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split(
"\\W+")))
        .groupBy((key, value) -> value)
        .windowedBy(timeWindows)
        .count(Materialized.as(
"WordCounts-1"))
        .toStream()
        .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
        .branch(isEnglish, isFrench, isSpanish);
}

请注意,SendTo注释具有三个不同输出的绑定,并且方法本身返回一个KStream[]。Spring Cloud Stream在内部将分支发送到输出绑定到的Kafka主题。观察SendTo注释上指定的输出顺序。这些输出绑定将KStream[]按照它在数组中的顺序与传出配对。

KStream可被映射到数组的第一索引englishTopic,那么下一个frenchTopic等等,等等。这里的想法是,应用程序可以专注于函数方面,并使用Spring Cloud Stream设置所有这些输出流,否则开发人员必须为每个流单独执行。

Spring Cloud Stream中的错误处理
Spring Cloud Stream提供了处理失败消息的错误处理机制。它们可以被发送到死信队列(DLQ),这是由Spring Cloud Stream创建的特殊Kafka主题。将失败的记录发送到DLQ时,会将标头添加到包含有关失败的更多信息的记录中,例如异常堆栈跟踪,消息等。
发送到DLQ是可选的,框架提供各种配置选项来自定义它。
对于Spring Cloud Stream中的Kafka Streams应用程序,错误处理主要集中在反序列化错误上。Apache Kafka Streams绑定器提供了使用Kafka Streams提供的反序列化处理程序的功能。它还提供在主流继续处理时将失败的记录发送到DLQ的能力。当应用程序需要返回访问错误记录时,这很有用。

Schema evolution和Confluent Schema Registry
Spring Cloud Stream 通过提供与Confluent Schema Registry以及Spring Cloud Stream提供的本机模式注册表服务器一起使用的功能来支持模式演变。应用程序通过在应用程序级别包含@EnableSchemaRegistryClient注释来启用架构注册表。Spring Cloud Stream提供各种基于Avro的消息转换器,可以方便地与模式演变一起使用。使用Confluent Schema Registry时,Spring Cloud Stream提供了一个特殊的客户端实现(ConfluentSchemaRegistryClient),应用程序需要将其作为SchemaRegistryClient  bean 提供。