使用Spring Request-Reply实现基于Kafka的同步请求响应

大家提到Kafka时第一印象就是它是一个快速的异步消息处理系统,不同于通常tomcat之类应用服务器和前端之间的请求/响应方式请求,客户端发出一个请求,必然会等到一个响应,这种方式对Kafka来说好像不适合,因为Kafka是一种事件驱动方式,通过事件才能激活一个响应,但是,问题来了,很多人习惯请求响应模型以后很难接受这种事件响应模型,包括发布订阅模型。

当然,Kafka不是不能实现通常的请求响应模型,只要使用两个Kafka主题,一个是负责请求的主题,另外一个是负责响应的主题,还必须在消息的生产者记录中构建相关ID,将与消息的消费者记录中的ID进行对应关联起来,实际上就是将请求Id和响应Id进行关联。


客户端---->请求的主题 ----消费者处理请求并把结果发送到---->响应主题--->客户端

随着Spring-Kafka最新版本推出(Spring replying kafka 模板),这种请求-响应模型实现就更加简单了,不需要开发人员自己进行请求Id和响应Id的关联,由Spring kafka模板实现。

下面这个案例例演示了Spring-Kafka是如何实现同步的请求响应模型的,源码见github

下图是本案例的演示架构图,这个案例是以同步行为返回两个数字总和的结果。


客户端-->请求-->RESTcontroll-->Spring-kafka模板-->Kafka请求主题-->Kafka监听器

客户端<--响应<--RESTcontroll<--Spring-kafka模板<--Kafka响应主题<--Kafka监听器

下面我们开始看看开发这个演示步骤:

设置Springboot启动类
首先需要在pom.xml引入Spring kafka模板:


<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

代码如下:

@SpringBootApplication
public class RequestReplyKafkaApplication {

public static void main(String[] args) {
SpringApplication.run(RequestReplyKafkaApplication.class, args);
}
}


设置Spring ReplyingKafkaTemplate
我们需要在Springboot配置类的KafkaConfig对Spring kafka模板进行配置:


@Configuration
public class KafkaConfig {

在这个配置类中,我们需要配置核心的ReplyingKafkaTemplate类,这个类继承了 KafkaTemplate 提供请求/响应的的行为;还有一个生产者工厂(参见 ProducerFactory 下面的代码)和 KafkaMessageListenerContainer。这是最基本的设置,因为请求响应模型需要对应到消息生产者和消费者的行为。


// 这是核心的ReplyingKafkaTemplate
@Bean
public ReplyingKafkaTemplate<String, Model, Model> replyKafkaTemplate(ProducerFactory<String, Model> pf, KafkaMessageListenerContainer<String, Model> container) {
return new ReplyingKafkaTemplate<>(pf, container);
}

// 配件:监听器容器Listener Container to be set up in ReplyingKafkaTemplate
@Bean
public KafkaMessageListenerContainer<String, Model> replyContainer(ConsumerFactory<String, Model> cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}

// 配件:生产者工厂Default Producer Factory to be used in ReplyingKafkaTemplate
@Bean
public ProducerFactory<String,Model> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

// 配件:kafka生产者的Kafka配置Standard KafkaProducer settings - specifying brokerand serializer
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}

设置spring-Kafka的监听器
这与通常创建的Kafka消费者相同。唯一的变化是额外是在工厂中设置ReplyTemplate,这是必须的,因为消费者需要将计算结果放入到Kafka的响应主题。


//消费者工厂 Default Consumer Factory
@Bean
public ConsumerFactory<String, Model> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(Model.class));
}

// 并发监听器容器Concurrent Listner container factory
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Model>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Model> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// NOTE - set up of reply template 设置响应模板
factory.setReplyTemplate(kafkaTemplate());
return factory;
}

// Standard KafkaTemplate
@Bean
public KafkaTemplate<String, Model> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

编写我们的kafka消费者
这是过去创建的Kafka消费者一样。唯一的变化是附加了@SendTo注释,此注释用于在响应主题上返回业务结果。


@KafkaListener(topics = "${kafka.topic.request-topic}")
@SendTo
public Model listen(Model request) throws InterruptedException {
int sum = request.getFirstNumber() + request.getSecondNumber();
request.setAdditionalProperty(
"sum", sum);
return request;
}

这个消费者用于业务计算,把客户端通过请求传入的两个数字进行相加,然后返回这个请求,通过@SendTo发送到Kafka的响应主题。


总结服务
现在,让我们将所有这些都结合在一起放在RESTcontroller,步骤分为几步,先创建生产者记录,并在记录头部中设置接受响应的Kafka主题,这样
把请求和响应在Kafka那里对应起来,然后通过模板发布消息到Kafka,再通过future.get()堵塞等待Kafka的响应主题发送响应结果过来。这时再
打印结果记录中的头部信息,会看到Spring自动生成相关ID。


@ResponseBody
@PostMapping(value="/sum",produces=MediaType.APPLICATION_JSON_VALUE,consumes=MediaType.APPLICATION_JSON_VALUE)
public Model sum(@RequestBody Model request)throws InterruptedException,ExecutionException {
//创建生产者记录
ProducerRecord<String,Model> record = new ProducerRecord<String,Model>(requestTopic,request);
//在记录头部中设置响应主题
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
//发布到kafka主题中
RequestReplyFuture<String, Model, Model> sendAndReceive = kafkaTemplate.sendAndReceive(record);

//确认生产者是否成功生产
SendResult<String, Model> sendResult = sendAndReceive.getSendFuture().get();

//打印结果记录中所有头部信息 会看到Spring自动生成的相关ID,这个ID是由消费端@SendTo 注释返回的值。
sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() +
":" + header.value().toString()));

//获取消费者记录
ConsumerRecord<String, Model> consumerRecord = sendAndReceive.get();

//返回消费者结果
return consumerRecord.value();
}

并发消费者
即使你要创建请求主题在三个分区中,三个并发的消费者的响应仍然合并到一个Kafka响应主题,这样,Spring侦听器的容器能够完成匹配相关ID的繁重工作。
整个请求/响应的模型是一致的。

现在我们可以再修改启动类如下:


@ComponentScan(basePackages = {
"com.gauravg.config",
"com.gauravg.consumer",
"com.gauravg.controller",
"com.gauravg.model"
})
@SpringBootApplication
public class RequestReplyKafkaApplication {

public static void main(String[] args) {
SpringApplication.run(RequestReplyKafkaApplication.class, args);
}
}

下面开始运行这个案例:
1.下载源码见github
2.先启动kafka
3.直接运行上面启动类
4.通过postman等工具访问:
http://localhost:8080/sum

post数据:


{
"firstNumber": "111",
"secondNumber": "2222"
}

返回结果是:


{
"firstNumber": 111,
"secondNumber": 2222,
"sum": 2333
}

在控制台输出记录头部信息:


kafka_replyTopic:[B@1f59b198
kafka_correlationId:[B@356a7326
__TypeId__:[B@1a9111f

可见,Spring自动生成聚合ID(correlationId),无需我们自己手工比对了。


Synchronous Kafka: Using Spring Request-Reply - DZ
[该贴被admin于2018-07-23 18:11修改过]
[该贴被admin于2018-07-23 18:13修改过]