Spring Boot的微服务分散聚集模式教程与源码 - vinsguru

20-12-24 banq

本教程演示分散聚集模式( Scatter Gather Pattern),它是分布式系统体系结构的企业集成模式之一。

让我们考虑一个需要完成一组任务以完成业务工作流程的应用程序。如果这些任务彼此不依赖,那么按顺序执行它们就没有意义。我们可以并行执行这些任务。

分散收集模式可帮助我们分发这些任务,以实现任务/消息/事件的并行处理,然后最终将响应汇总为单个响应,如上所示。

 

案例说明

一个用户预订机票的机票预订应用程序。该应用程序将信息发送给所有航空公司,找到他们的票价,然后进行回复。

由于我们的应用程序依赖于第三方API,并且我们需要为用户提供最佳的用户体验,因此我们将向所有航空公司发布用户请求,无论在特定的超时时间内做出回应,我们都会收集所有结果并向我们显示前5名交易用户。

主应用程序甚至不知道有多少航空公司正在监听请求。即使某些航空公司的服务无法正常运行,也不会影响我们的飞行应用程序。

 

案例SpringBoot实现

我们的项目依赖于超快速的NATS消息服务器。因此,要添加此依赖项:

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>2.6.8</version>
</dependency>

发布聚集模式的实现服务类ScatterGatherService:

@Service
public class ScatterGatherService {

    @Autowired
    private Connection nats;

    public Mono<FlightSearchResponse> broadcast(FlightSearchRequest flightSearchRequest){
        // create inbox
        String inbox = nats.createInbox();
        Subscription subscription = nats.subscribe(inbox);
        return Flux.generate((SynchronousSink<FlightSchedule[]> fluxSink) -> receiveSchedules(fluxSink, subscription))
                    .flatMap(Flux::fromArray)
                    .bufferTimeout(5, Duration.ofSeconds(1))
                    .map(list -> {
                        list.sort(Comparator.comparing(FlightSchedule::getPrice));
                        return list;
                    })
                    .map(list -> FlightSearchResponse.of(flightSearchRequest, list))
                    .next()
                    .doFirst(() -> nats.publish("flight.search", inbox, ObjectUtil.toBytes(flightSearchRequest)))
                    .doOnNext(i -> subscription.unsubscribe());
    }

    private void receiveSchedules(SynchronousSink<FlightSchedule[]> synchronousSink, Subscription subscription){
        try{
            Message message = subscription.nextMessage(Duration.ofSeconds(1));
            ObjectUtil.toObject(message.getData(), FlightSchedule[].class).ifPresent(synchronousSink::next);
        }catch (Exception e){
            synchronousSink.error(e);
        }
    }

} 
 

演示

发送一个请求:http://localhost:8080/flight/Houston/LasVegas,收到如下所示的回复:

{
   "searchRequest":{
      "from":"Houston",
      "to":"LasVegas"
   },
   "schedules":[
      {
         "date":"2021-01-02",
         "price":72,
         "airline":"DELTA"
      },
      {
         "date":"2020-12-28",
         "price":87,
         "airline":"UNITED_AIRLINE"
      },
      {
         "date":"2021-01-02",
         "price":109,
         "airline":"FRONTIER"
      },
      {
         "date":"2021-01-08",
         "price":229,
         "airline":"UNITED_AIRLINE"
      },
      {
         "date":"2021-01-02",
         "price":408,
         "airline":"DELTA"
      }
   ]
}

成功证明在我们的微服务架构中使用分散收集模式可以有效地并行处理任务并最终汇总结果。

源代码可在此处获得

更详细点击标题见原文。

              

猜你喜欢