Kafka Streams+SpringBoot之探索:将一个流转换到另外一个流 - mydeveloperplanet


为了从主题中读取无限制的数据流,我们需要创建一个小型应用程序,以发送无限制的数据流。我们模拟一条Tweet流,在Tweet末尾恰好有一个标签。每秒都会在该主题上发布一条消息。Tweets始终包含相同的消息(Lorem ipsum…),主题标签是从5个主题标签的固定列表中随机选择的。

@RestController
public class KafkaProducerController {
 
  private static final String loremIpsum = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor " +
     
"incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco " +
     
"laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit " +
     
"esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa " +
     
"qui officia deserunt mollit anim id est laborum.";
 
  private static final String[] hashTags = {
"latin", "italy", "roman", "caesar", "cicero"};
 
  private Random randomNumber = new Random();
 
  private String randomMessage;
 
  @RequestMapping(
"/sendMessages/")
  public void sendMessages() {
 
    Properties props = new Properties();
    props.put(
"bootstrap.servers", "localhost:9092");
    props.put(
"acks", "all");
    props.put(
"key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
    Producer<String, String> producer = new KafkaProducer<>(props);
 
    try {
      while (true) {
       
// Every second send a message
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {}
 
        randomMessage = loremIpsum +
" #" + hashTags[randomNumber.nextInt(hashTags.length)];
        producer.send(new ProducerRecord<String, String>(
"my-kafka-streams-topic", null, randomMessage));
 
      }
    } finally {
      producer.close();
    }
 
  }
 
}

变量loremIpsum是固定消息,变量hashTags是主题标签的固定列表。在该sendMessages方法中,我们首先定义一些Kafka属性。在第一个try块中,无休止的循环每秒向主题发送一条带有随机主题标签(变量randomMessage)的消息。my-kafka-streams-topic。我们只为该主题提供一个值,而不是一个键,因为该主题中的键与我们无关。
最后要做的是在Kafka中创建主题。我们通过Kafka的bin目录中的kafka-topics.sh 脚本来执行此操作 :
$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-kafka-streams-topic

可以通过调用URL http:// localhost:8081 / sendMessages /来启动数据流。

现在我们已经做好了所有准备,现在该开始第一个流媒体示例了。我们将从中读取Tweets my-kafka-streams-topic,使用主题标签过滤Tweets latin并将其发布到主题my-kafka-streams-out1。
我们创建一个Spring Boot应用程序(mykafkaprocessingplanet多模块Maven项目中的模块),并将以下依赖项添加到pom中:

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.3.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.0</version>
  </dependency>
</dependencies>

可以看出,除了kafka-streams依赖之外,我们还必须添加kafka-clients依赖。如果不添加最后一个依赖项,则在创建时会遇到以下错误KStream:

java.lang.NoSuchMethodError: org.apache.kafka.clients.admin.AdminClientConfig

与生产者一样,我们确保可以在调用URL之后开始处理。这URL的KafkaProcessingController:

private KafkaStreams streams1;
 
@RequestMapping("/startProcessing1/")
public void startProcessing1() {
 
  Properties props = new Properties();
  props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"processor1");
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
  final StreamsBuilder builder = new StreamsBuilder();
 
//魔术关键行
  builder.stream(
"my-kafka-streams-topic").filter((key, value) -> ((String) value).endsWith("latin")).to("my-kafka-streams-out1");
 
  final Topology topology = builder.build();
  streams1 = new KafkaStreams(topology, props);
  streams1.start();
 
}

与往常一样,首先我们需要定义属性以连接到主题。注意,我们没有覆盖默认键Serializer / Deserializer,因为我们没有为主题中的消息提供键。注意魔术关键行,我们从my-kafka-streams-topic中读取流,过滤以结“latin”结尾的消息,并将过滤后的消息发送到主题my-kafka-streams-out1

最后要做的是创建输出主题:

$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-kafka-streams-out1

是时候把所有东西放在一起了。通过从mykafkaproducerplanet目录调用以下命令来启动生产者:
$ mvn spring-boot:run

执行以下命令来启动Consumer :

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-kafka-streams-out1 --from-beginning

通过调用URL http:// localhost:8081 / sendMessages /将消息从生产者发送到my-kafka-streams-topic,通过调用URL http:// localhost:8082 / startProcessing1 /实现流转发处理;可以通过调用URL http:// localhost:8082 / stop来停止流处理。

帖子中使用的资源当然可以在GitHub上找到。点击标题见原文