Kafka Streams+SpringBoot之探索:统计计数 - mydeveloperplanet


本示例接上一个案例,其中有发送消息的案例,这里只是消费者举例,我们将从Tweets my-kafka-streams-topic中读取流,创建一个以#标签为值的新中间流,将其转换存都KTable,包含每个#标签的数量,然后将其发布到topic my-kafka-streams-out2。这是一个Ktable。

我们向kafkaprocessingplanet模块添加了一个新的REST端点。我们将阅读同一主题my-kafka-streams-topic,并将其进行转换,实现主题标签作为key和计数作为值,发布消息到my-kafka-streams-hashtagcount-output。我们假定消息中始终有一个#号标签,并且它在消息的末尾出现,只是为了使事情简单。

Kafka属性与第一个示例相同,不同之处在于我们确实提供了key的序列化/反序列化的配置,因为我们将发布一个以String作为key的主题。否则,您将遇到ClassCastException。

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

首先,我们将创建一个新标签流,该流将仅包含主题标签作为值。这可以通过以下方式实现:

KStream<Object, String> hashTags = source.flatMapValues(new ValueMapper<String, Iterable>() {
    @Override
    public Iterable apply(String value) {
      return Arrays.asList(value.substring(value.indexOf("#")));
    }
});

为了简洁起见,我们将其转换为lambda:

KStream<Object, String> hashTags = source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#"))));

我们有一个流,其中没有键key和主题标签作为值。为了知道某个标签在流中出现多少次,我们需要每个标签的记录。因此,我们创建一个KGroupedStream带有标签作为键key和标签为值。将KeyValueMapper用于创建keyy。我们将原始键和值作为输入参数提供给apply方法,并将新键作为返回String。

KGroupedStream<String, String> hashTags = source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#"))))
  .groupBy(new KeyValueMapper<Object, String, String>() {
      @Override
      public String apply(Object key, String value) {
        return value;
      }
});

同样,我们将其转换为lambda:

KGroupedStream<String, String> hashTags = source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#")))) 
  .groupBy((key, value) -> value)

接下来,我们调用的KGroupedStream的count方法,将为KTable counts我们创建一个标签,标签为键,计数为值。

KTable<String, Long> counts = source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#"))))
  .groupBy((key, value) -> value)
  .count();

现在我们唯一需要做的就是创建一个流,并将其发布到主题my-kafka-streams-hashtagcount-output。

source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#"))))
  .groupBy((key, value) -> value)
  .count()
  .toStream()
  .to("my-kafka-streams-hashtagcount-output", Produced.with(Serdes.String(), Serdes.Long()));

最后,我们需要添加以下内容以启动流:

final Topology topology = builder.build();
streams2 = new KafkaStreams(topology, props);
streams2.start();

输出主题必须配置为启用日志压缩。请参阅官方文档简短说明以了解日志压缩。日志压缩可确保将多个具有相同键的记录发布到某个主题时,仅保留最新版本。仅最后一个版本有用,如果未启用日志压缩,则旧版本将消耗不必要的资源。启用日志压缩后,Kafka将清除较旧的版本。有了这些信息,我们可以创建输出主题:

$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-kafka-streams-hashtagcount-output --config "cleanup.policy=compact"

测试所有内容了。像以前一样启动Producer和Processor Spring Boot应用程序。

过调用URL http:// localhost:8081 / sendMessages /将消息从生产者发送到my-kafka-streams-topic;通过调用URL http:// localhost:8082 / startProcessing2 /来启动流处理器。

使用一些额外的属性启动主题的使用者,以便将数据正确打印到控制台。我们注意到每隔30秒就会有新结果打印到控制台。

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-kafka-streams-hashtagcount-output --property print.key=true --value-deserializer=org.apache.kafka.common.serialization.LongDeserializer
italy 85
latin 84
cicero 100
caesar 87

除了使用count方法之外,我们还可以使用以下方法:

count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))

在这种情况下,结果将存储在可以查询的状态存储中。如何查询这些状态存储可以在这里找到。
帖子中使用的资源当然可以在GitHub上找到