Spring Apache Kafka (spring-kafka)提供了基于卡夫卡的消息传递解决方案的高级抽象。传统的请求响应模型中,响应容易被堵塞,造成两个系统耦合,调用者需要等待到响应返回才能继续做自己的工作,这在分布式系统中,流量比较大情况下几乎不现实,使用消息模型只能每次请求一个消息,响应再来一个消息,用两个消息组合成请求响应,虽然编程没有传统请求响应方便,但是系统松耦合,相互协调好。
spring-kafka使用起来了很简单:
引入Maven包:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
消息生产者代码:
@Autowired KafkaTemplate<String, String> kafkaTemplate;
public void send(@RequestParam String productId) { kafkaTemplate.send("cdProduct", productId); }
|
生产者的配置application.property:
spring.kafka.consumer.group-id=ecomm spring.kafka.bootstrap-servers=localhost:9092
|
消息接受者代码,这里接受到productId以后,查询到Product对象,再给发送者发回去,模拟请求-响应模型:
@KafkaListener(topics = "cdProduct") public void onAction(ConsumerRecord<?, ?> consumerRecord) { System.out.printf("接受到=" + consumerRecord); String productId = (String) consumerRecord.value(); System.out.printf("接受到productId=" + productId); Product product = productRepo.findById(productId).orElse(new Product()); kafkaTemplate.send("cdProductReply", product); }
|
消费者需要配置JSON序列化将Product变成JSON,这里只要配置在application.property中即可,无需做代码生成自己ProducerFactory工厂:
spring.kafka.consumer.group-id=ecomm spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
|
生产者接受到消费者查询到的Product放入自己的缓存。
@KafkaListener(topics = "cdProductReply") public void onAction(Product product) { System.out.printf("接受到新的Product" + product.getName()); cache.put(product.getId(), product);
}
|
为了实现Product直接序列化接受,需要在自己的入口类Application中加入:
@Bean public StringJsonMessageConverter jsonConverter() { return new StringJsonMessageConverter(); }
|
无需配置监听器连接工厂ConcurrentKafkaListenerContainerFactory即可有用。