PubSub是用于微服务体系结构中服务到服务通信的异步消息传递模型:一个服务(发布者,而不是将消息发送给特定的收件人),它发布的消息到一个主题/通道,通过有关各个订阅者(Subscribers)接收消息。
好处:
- 一对多通讯,发布者可以发布一条消息,其中N个订阅者可以接收并对消息做出反应。
- 松耦合,服务不是紧密耦合的。任何服务都可以使用/忽略消息
- 更好的性能,发布者不必调用N个服务。相反,它只是将消息发布到主题中。它不必对订户有任何了解,不会被堵塞。
缺点:
- PubSub是一劳永逸的模型。如果收件者处于脱机状态,则他们可能不会收到该消息。
- PubSub是扇出模型。也就是说,同一服务的多个实例将收到该消息。
我们将创建一个2个简单的Spring Boot应用程序。1个将扮演发布者的角色,另一个将成为订阅者。
- 发布者:此应用程序将定期发布笑话
- 订户:可以有N个订户。在我们的情况下,我们将有1个订阅者。
- 当新玩笑发布时,将通知该订户。
- 订户可以用这个玩笑做任何事情。在我们的情况下,我们将只在控制台上打印。
为笑话创建一个DTO:@Data public class Joke implements Serializable {
private static final String JOKE_FORMAT = "Q: %s \nA: %s";
private String setup; private String punchline;
@Override public String toString() { return String.format(JOKE_FORMAT, this.setup, this.punchline); } }
|
Redis PubSub –发布者:
@EnableScheduling @SpringBootApplication public class RedisPublisherApplication {
public static void main(String[] args) { SpringApplication.run(RedisPublisherApplication.class, args); }
@Bean public ReactiveRedisOperations<String, Joke> jokeTemplate(LettuceConnectionFactory lettuceConnectionFactory){ RedisSerializer<Joke> valueSerializer = new Jackson2JsonRedisSerializer<>(Joke.class); RedisSerializationContext<String, Joke> serializationContext = RedisSerializationContext.<String, Joke>newSerializationContext(RedisSerializer.string()) .value(valueSerializer) .build(); return new ReactiveRedisTemplate<String, Joke>(lettuceConnectionFactory, serializationContext); } }
|
- 然后,我们自动连接ReactiveRedisOperation以每3秒定期发布一次消息。
@Service public class PublisherService {
private static final String JOKE_API_ENDPOINT = "https://official-joke-api.appspot.com/jokes/random"; private WebClient webClient;
@Autowired private ReactiveRedisOperations<String, Joke> redisTemplate;
@Value("${topic.name:joke-channel}") private String topic;
@PostConstruct private void init(){ this.webClient = WebClient.builder() .baseUrl(JOKE_API_ENDPOINT) .build(); }
@Scheduled(fixedRate = 3000) public void publish(){ this.webClient.get() .retrieve() .bodyToMono(Joke.class) .flatMap(joke -> this.redisTemplate.convertAndSend(topic, joke)) .subscribe(); }
}
|
Redis PubSub –订阅者:
订户方相对非常简单。在这里,我们只需订阅(侦听)频道,即可在控制台上打印值。我们可以包含多个频道名称。
@Service public class SubscriberService {
@Autowired private ReactiveRedisOperations<String, Joke> reactiveRedisTemplate;
@Value("${topic.name:joke-channel}") private String topic;
@PostConstruct private void init(){ this.reactiveRedisTemplate .listenTo(ChannelTopic.of(topic)) .map(ReactiveSubscription.Message::getMessage) .subscribe(System.out::println); }
}
|
Dockerizing基础架构:
我在Dockerfile下面使用dockerize发布者和订阅者应用程序。
# Use JRE11 slim FROM openjdk:11.0-jre-slim
# Add the app jar ADD target/*.jar redis-pubsub.jar
ENTRYPOINT java -jar redis-pubsub.jar
|
具有所有依赖项的docker-compose文件
version: '3' services: redis: image: redis ports: - 6379:6379 publisher: build: ./redis-publisher image: vinsdocker/redis-publisher depends_on: - redis environment: - SPRING_REDIS_HOST=redis subscriber: build: ./redis-subscriber image: vinsdocker/redis-subscriber depends_on: - redis environment: - SPRING_REDIS_HOST=redis
|
演示:
一切准备就绪后,我将一一运行这些命令。
- 建立项目mvn clean package -DskipTests
- 构建Docker映像docker-compose build
- 运行应用程序docker-compose up
总结
通过开发2个简单的微服务,我们能够通过Spring Boot成功演示Redis PubSub。正如我们在上方看到的,发布者和订阅者之间并没有紧密耦合,但是他们仍然能够通过Redis PubSub功能进行通信。
源代码可在此处获得。