使用 WebClient 执行同步请求

在本教程中,我们演示如何使用WebClient管理同步通信,WebClient 是一种专为反应式编程设计但能够进行阻塞调用的工具。

随着反应式编程变得越来越普及,我们将研究此类阻塞请求仍然适当且必要的场景。

Spring 中的 HTTP 客户端库概述
让我们首先简要回顾一下当前可用的客户端库,看看我们的WebClient适合哪里。

RestTemplate在 Spring Framework 3.0 中推出后,因其用于 HTTP 请求的简单模板方法 API 而广受欢迎。然而,其同步特性和许多重载方法导致高流量应用程序中的复杂性和性能瓶颈。

在Spring 5.0中,引入了WebClient作为非阻塞请求的更高效、更响应式的替代方案。尽管它是响应式堆栈 Web 框架的一部分,但它支持用于同步和异步通信的流畅 API。

在 Spring Framework 6.1 中,RestClient提供了另一种执行 REST 调用的选项。它将WebClient的流畅 API与RestTemplate的基础架构(包括消息转换器、请求工厂和拦截器)相结合。

RestClient针对同步请求进行了优化,但如果我们的应用程序还需要异步或流式传输功能,则WebClient会更好。使用WebClient进行阻塞和非阻塞 API 调用,我们可以保持代码库的一致性,并避免混合不同的客户端库。

阻塞与非阻塞 API 调用
在讨论各种 HTTP 客户端时,我们使用了同步和异步、阻塞和非阻塞等术语。这些术语与上下文相关,有时可能代表同一概念的不同名称。

在方法调用方面,WebClient根据其发送和接收 HTTP 请求和响应的方式支持同步和异步交互。如果它等待前一个请求完成后再继续执行后续请求,则它以阻塞方式执行此操作,并且结果将同步返回。

另一方面,我们可以通过执行立即返回的非阻塞调用来实现异步交互。在等待另一个系统的响应时,其他处理可以继续,一旦准备就绪,结果就会异步提供。

何时使用同步请求
如上所述,WebClient是 Spring Webflux框架的一部分,默认情况下,该框架中的所有内容都是响应式的。但是,该库提供异步和同步操作支持,使其适用于响应式和 servlet 堆栈 Web 应用程序。

当需要立即反馈时(例如在测试或原型设计期间),以阻塞方式使用WebClient是合适的。这种方法使我们在考虑性能优化之前可以专注于功能。

许多现有应用程序仍在使用RestTemplate等阻塞客户端。由于RestTemplate从 Spring 5.0 开始处于维护模式,因此重构遗留代码库将需要更新依赖项,并可能需要过渡到非阻塞架构。在这种情况下,我们可以暂时以阻塞方式使用WebClient 。

即使在新项目中,某些应用程序部分也可以设计为同步工作流。这可以包括对各种外部系统的顺序 API 调用等场景,其中一个调用的结果对于下一个调用是必需的。WebClient 可以处理阻塞和非阻塞调用,而不必使用不同的客户端。

我们稍后会看到,同步和异步执行之间的切换相对简单。只要有可能,我们就应该避免使用阻塞调用,尤其是在使用反应堆栈时。

使用 WebClient 进行同步 API 调用
发送 HTTP 请求时,WebClient会从Reactor Core库中返回两种反应数据类型之一- Mono或Flux。这些返回类型表示数据流,其中Mono对应于单个值或空结果,而Flux指的是零个或多个值的流。拥有异步和非阻塞 API 可让调用者决定何时以及如何订阅,从而保持代码的反应性。

但是,如果我们想模拟同步行为,我们可以调用可用的block()方法。它将阻止当前操作以获取结果。

更准确地说,block()方法触发对反应流的新订阅,从而启动从源到消费者的数据流。在内部,它使用 CountDownLatch 等待流完成,这会暂停当前线程直到操作完成,即直到Mono或Flux发出结果。block ()方法将非阻塞操作转换为传统的阻塞操作,导致调用线程等待结果。 

实例
让我们看看实际效果。想象一个位于客户端应用程序和两个后端应用程序(客户和计费系统)之间的简单API 网关应用程序。第一个保存客户信息,而第二个提供账单详细信息。不同的客户端通过北向API 与我们的 API 网关交互,北向 API 是向客户端公开的接口,用于检索客户信息,包括他们的账单详细信息:

@GetMapping("/{id}")
CustomerInfo getCustomerInfo(@PathVariable(
"id") Long customerId) {
    return customerInfoService.getCustomerInfo(customerId);
}

模型类如下所示:

public class CustomerInfo {
    private Long customerId;
    private String customerName;
    private Double balance;
    // standard getters and setters
}

API 网关通过提供与客户和计费应用程序进行内部通信的单一端点简化了流程。然后,它会汇总来自两个系统的数据。

考虑一下我们在整个系统中使用同步 API 的场景。但是,我们最近升级了客户和计费系统以处理异步和非阻塞操作。让我们看看这两个向南的 API 现在是什么样子。

客户 API:

@GetMapping("/{id}")
Mono<Customer> getCustomer(@PathVariable(
"id") Long customerId) throws InterruptedException {
    TimeUnit.SECONDS.sleep(SLEEP_DURATION.getSeconds());
    return Mono.just(customerService.getBy(customerId));
}

计费 API:

@GetMapping("/{id}")
Mono<Billing> getBilling(@PathVariable(
"id") Long customerId) throws InterruptedException {
    TimeUnit.SECONDS.sleep(SLEEP_DURATION.getSeconds());
    return Mono.just(billingService.getBy(customerId));
}

在实际情况下,这些 API 将是单独组件的一部分。但是,为了简单起见,我们已将它们组织到代码中的不同包中。此外,为了进行测试,我们引入了延迟来模拟网络延迟:

public static final Duration SLEEP_DURATION = Duration.ofSeconds(2);

与两个后端系统不同,我们的 API 网关应用程序必须公开同步、阻塞 API,以避免破坏客户端契约。因此,那里没有任何变化。

业务逻辑位于CustomerInfoService中。首先,我们将使用WebClient从客户系统中检索数据:

Customer customer = webClient.get()
  .uri(uriBuilder -> uriBuilder.path(CustomerController.PATH_CUSTOMER)
    .pathSegment(String.valueOf(customerId))
    .build())
  .retrieve()
  .onStatus(status -> status.is5xxServerError() || status.is4xxClientError(), response -> response.bodyToMono(String.class)
    .map(ApiGatewayException::new))
  .bodyToMono(Customer.class)
  .block();

接下来是计费系统:

Billing billing = webClient.get()
  .uri(uriBuilder -> uriBuilder.path(BillingController.PATH_BILLING)
    .pathSegment(String.valueOf(customerId))
    .build())
  .retrieve()
  .onStatus(status -> status.is5xxServerError() || status.is4xxClientError(), response -> response.bodyToMono(String.class)
    .map(ApiGatewayException::new))
  .bodyToMono(Billing.class)
  .block();

最后,使用两个组件的响应,我们将构建一个响应:

new CustomerInfo(customer.getId(), customer.getName(), billing.getBalance());

如果某个 API 调用失败,则onStatus()方法中定义的错误处理会将 HTTP 错误状态映射到ApiGatewayException。在这里,我们使用传统方法,而不是通过Mono.error()方法采用被动替代方法。由于我们的客户期望同步 API,因此我们抛出了会传播给调用者的异常。

尽管客户和计费系统具有异步特性,WebClient的block()方法使我们能够从两个来源聚合数据并向客户端透明地返回组合结果。

优化多个 API 调用
此外,由于我们要对不同的系统进行两次连续调用,因此我们可以通过避免单独阻止每个响应来优化流程。我们可以执行以下操作:

private CustomerInfo getCustomerInfoBlockCombined(Long customerId) {
    Mono<Customer> customerMono = webClient.get()
      .uri(uriBuilder -> uriBuilder.path(CustomerController.PATH_CUSTOMER)
        .pathSegment(String.valueOf(customerId))
        .build())
      .retrieve()
      .onStatus(status -> status.is5xxServerError() || status.is4xxClientError(), response -> response.bodyToMono(String.class)
        .map(ApiGatewayException::new))
      .bodyToMono(Customer.class);
    Mono<Billing> billingMono = webClient.get()
      .uri(uriBuilder -> uriBuilder.path(BillingController.PATH_BILLING)
        .pathSegment(String.valueOf(customerId))
        .build())
      .retrieve()
      .onStatus(status -> status.is5xxServerError() || status.is4xxClientError(), response -> response.bodyToMono(String.class)
        .map(ApiGatewayException::new))
      .bodyToMono(Billing.class);
    return Mono.zip(customerMono, billingMono, (customer, billing) -> new CustomerInfo(customer.getId(), customer.getName(), billing.getBalance()))
      .block();
}

zip()是一种将多个Mono实例组合成单个Mono的方法。当所有给定的Mono都生成了它们的值时,新的Mono就完成了,然后根据指定的函数对这些值进行聚合 - 在我们的例子中,创建了一个CustomerInfo对象。这种方法更高效,因为它允许我们同时等待两个服务的组合结果。

为了验证我们是否提高了性能,让我们在两种情况下运行测试:

@Autowired
private WebTestClient testClient;
@Test
void givenApiGatewayClient_whenBlockingCall_thenResponseReceivedWithinDefinedTimeout() {
    Long customerId = 10L;
    assertTimeout(Duration.ofSeconds(CustomerController.SLEEP_DURATION.getSeconds() + BillingController.SLEEP_DURATION.getSeconds()), () -> {
        testClient.get()
          .uri(uriBuilder -> uriBuilder.path(ApiGatewayController.PATH_CUSTOMER_INFO)
            .pathSegment(String.valueOf(customerId))
            .build())
          .exchange()
          .expectStatus()
          .isOk();
    });
}

最初,测试失败。但是,在切换到等待组合结果后,测试在客户和计费系统调用的总持续时间内完成。这表明我们通过聚合两个服务的响应提高了性能。即使我们使用阻塞同步方法,我们仍然可以遵循最佳实践来优化性能。这有助于确保系统保持高效和可靠。