使用Redis PubSub与Spring Boot实现微服务消息模型 - vinsguru


PubSub是用于微服务体系结构中服务到服务通信的异步消息传递模型:一个服务(发布者,而不是将消息发送给特定的收件人),它发布的消息到一个主题/通道,通过有关各个订阅者(Subscribers)接收消息。
好处:

  1. 一对多通讯,发布者可以发布一条消息,其中N个订阅者可以接收并对消息做出反应。
  2. 松耦合,服务不是紧密耦合的。任何服务都可以使用/忽略消息
  3. 更好的性能,发布者不必调用N个服务。相反,它只是将消息发布到主题中。它不必对订户有任何了解,不会被堵塞。

缺点:
  1. PubSub是一劳永逸的模型。如果收件者处于脱机状态,则他们可能不会收到该消息。
  2. PubSub是扇出模型。也就是说,同一服务的多个实例将收到该消息。

我们将创建一个2个简单的Spring Boot应用程序。1个将扮演发布者的角色,另一个将成为订阅者。
  • 发布者:此应用程序将定期发布笑话
  • 订户:可以有N个订户。在我们的情况下,我们将有1个订阅者。
    • 当新玩笑发布时,将通知该订户。
    • 订户可以用这个玩笑做任何事情。在我们的情况下,我们将只在控制台上打印。

为笑话创建一个DTO:

@Data
public class Joke implements Serializable {

    private static final String JOKE_FORMAT = "Q: %s \nA: %s";

    private String setup;
    private String punchline;

    @Override
    public String toString() {
        return String.format(JOKE_FORMAT, this.setup, this.punchline);
    }
}

Redis PubSub –发布者:
@EnableScheduling
@SpringBootApplication
public class RedisPublisherApplication {

    public static void main(String[] args) {
        SpringApplication.run(RedisPublisherApplication.class, args);
    }

    @Bean
    public ReactiveRedisOperations<String, Joke> jokeTemplate(LettuceConnectionFactory lettuceConnectionFactory){
        RedisSerializer<Joke> valueSerializer = new Jackson2JsonRedisSerializer<>(Joke.class);
        RedisSerializationContext<String, Joke> serializationContext = RedisSerializationContext.<String, Joke>newSerializationContext(RedisSerializer.string())
                .value(valueSerializer)
                .build();
        return new ReactiveRedisTemplate<String, Joke>(lettuceConnectionFactory, serializationContext);
    }
}

  • 然后,我们自动连接ReactiveRedisOperation以每3秒定期发布一次消息。

@Service
public class PublisherService {

    private static final String JOKE_API_ENDPOINT = "https://official-joke-api.appspot.com/jokes/random";
    private WebClient webClient;

    @Autowired
    private ReactiveRedisOperations<String, Joke> redisTemplate;

    @Value(
"${topic.name:joke-channel}")
    private String topic;

    @PostConstruct
    private void init(){
        this.webClient = WebClient.builder()
                .baseUrl(JOKE_API_ENDPOINT)
                .build();
    }

    @Scheduled(fixedRate = 3000)
    public void publish(){
        this.webClient.get()
                .retrieve()
                .bodyToMono(Joke.class)
                .flatMap(joke -> this.redisTemplate.convertAndSend(topic, joke))
                .subscribe();
    }

}

Redis PubSub –订阅者:
订户方相对非常简单。在这里,我们只需订阅(侦听)频道,即可在控制台上打印值。我们可以包含多个频道名称。

@Service
public class SubscriberService {

    @Autowired
    private ReactiveRedisOperations<String, Joke> reactiveRedisTemplate;

    @Value("${topic.name:joke-channel}")
    private String topic;

    @PostConstruct
    private void init(){
        this.reactiveRedisTemplate
                .listenTo(ChannelTopic.of(topic))
                .map(ReactiveSubscription.Message::getMessage)
                .subscribe(System.out::println);
    }

}

 

Dockerizing基础架构:
我在Dockerfile下面使用dockerize发布者和订阅者应用程序。

# Use JRE11 slim
FROM openjdk:11.0-jre-slim

# Add the app jar
ADD target/*.jar redis-pubsub.jar

ENTRYPOINT java -jar redis-pubsub.jar

具有所有依赖项的docker-compose文件

version: '3'
services:
  redis:
    image: redis
    ports:
      - 6379:6379
  publisher:
    build: ./redis-publisher
    image: vinsdocker/redis-publisher
    depends_on:
      - redis
    environment:
      - SPRING_REDIS_HOST=redis
  subscriber:
    build: ./redis-subscriber
    image: vinsdocker/redis-subscriber
    depends_on:
      - redis
    environment:
      - SPRING_REDIS_HOST=redis


 
演示:
一切准备就绪后,我将一一运行这些命令。

  1. 建立项目mvn clean package -DskipTests
  2. 构建Docker映像docker-compose build
  3. 运行应用程序docker-compose up

 
总结
通过开发2个简单的微服务,我们能够通过Spring Boot成功演示Redis PubSub。正如我们在上方看到的,发布者和订阅者之间并没有紧密耦合,但是他们仍然能够通过Redis PubSub功能进行通信。
源代码可在此处获得