在响应式SpringBoot中使用WebCLient避免堵塞? - Randal

21-06-11 banq

响应式编程的关键是做出反应。你不会说“现在就做”,而是说“什么时候做”。“何时”适用于您有工作要做的时间。工作以事件的形式出现:消息总线上的消息或 HTTP 请求。

首先,我应该解释响应式编程很重要的原因。Java 的优点之一是相对容易的线程处理。这使得线程成为处理事件的主要模型。当你得到一个事件时,你会分派一个线程来处理它。问题是当你收到很多事件时,你最终会创建很多线程。线程可能很昂贵;每个都有堆栈内存,切换线程需要系统调用和上下文切换。

Node.js的系统只有一个线程创建时。(它在 v10.5.0 中引入了工作线程)。然而,它成为一个非常流行的系统,用于构建可以处理数千个请求的服务器。它通过使用事件驱动的习惯用法来处理请求来做到这一点。因为它只有一个线程,所以大多数实现 HTTP 服务器或客户端、数据库客户端或其他 I/O 密集型库的库都必须使用单线程的单个事件循环。

但是 Java 使用了 thread-per-request,这已经成为扩展的瓶颈。像Scala这样的语言,因为它可以更具可扩展性而得名,它是用广泛的框架创建的,以支持事件驱动或异步 I/O。Java 8 引入了CompletableFuture. Java 9 引入了Flow带有Publisher和的类Subscriber。它们被用作两个完全响应式框架RxJavaProject Reactor 的基础

由于 Java 使用 thread-per-request模型这么久,大多数处理 I/O 的库都会阻塞。他们可以阻塞是因为他们希望拥有他们正在运行的线程并且不会阻塞其他请求。但是现在我们可以使用异步模型,它们就成了一个问题。而且因为 Java 现在有一个混合模型,所以很难说在一个主要是异步的系统中何时以及如何使用线程。

  

堵塞案例

我们举一个常见的例子。您有一个接受 HTTP GET 并查询另一个服务的服务。在这种情况下,为了简单起见,上游服务只是http://www.google.com。这是Spring Boot 的实现:

@Slf4j
@RestController
@RequestMapping("/")
public class Controller {
  RestTemplate restTemplate = new RestTemplate();
  @GetMapping("/blocking")
  public Mono<String> getFiles() {
    log.info("querying google");
    return Mono.just(getHttpBlocking("https://www.google.com"))
      .doOnNext(s -> 
           log.info("found content length {}", s.length()));
  }

  String getHttpBlocking(String url) {
    return restTemplate.getForObject(url, String.class);
  }
}

运行结果:

{"@timestamp":"2021-06-07T07:05:57.225-04:00","@version":"1",
"message":"querying google",
"logger_name":"net.kamradtfamily.blockingnono.Controller",
"thread_name":"reactor-http-nio-3",
"level":"INFO","level_value":20000}
{"@timestamp":"2021-06-07T07:05:57.640-04:00","@version":"1",
"message":"found content length 12962",
"logger_name":"net.kamradtfamily.blockingnono.Controller",
"thread_name":"reactor-http-nio-3",
"level":"INFO","level_value":20000}

请注意,返回线程与调用线程相同(thread_name相同)。线程在一段时间内被阻塞,并且必须等待未使用,直到请求完成。

 

如何解决堵塞?

那么我们如何解锁呢?在这种特殊情况下有两种方式:正确的方式和错误的方式。

我们先来看看错误的方法。这涉及创建一个新线程来调用阻塞调用。乍一看,这似乎是弄巧成拙,但它释放了请求线程(在上述情况下reactor-http-nio-3),以便对可能不会阻塞或阻塞时间较短的事物进行更多请求。

为了实现这一点,我们使用Mono.fromCallable代替Mono.just。然后我们使用subscribeOn来改变线程:

return Mono.fromCallable(() ->
    getHttpBlocking("https://www.google.com"))
        .subscribeOn(Schedulers.boundedElastic())

输出:

{"@timestamp":"2021-06-07T07:42:49.709-04:00","@version":"1",
"message":"querying google",
"logger_name":"net.kamradtfamily.blockingnono.Controller",
"thread_name":"reactor-http-nio-3",
"level":"INFO","level_value":20000}
{"@timestamp":"2021-06-07T07:42:49.939-04:00","@version":"1",
"message":"found content length 12991",
"logger_name":"net.kamradtfamily.blockingnono.Controller",
"thread_name":"boundedElastic-1",
"level":"INFO","level_value":20000}

如您所见,输入请求仍由reactor-http-nio-x线程处理,但原始请求的阻塞调用和后续返回由名为 的工作池线程处理boundedElastic-x。该boundedElastic调度程序将创建一个线程池,然后排队使用它们的任务。

我们什么时候会使用这种“错误”的方式?当我们别无选择。如果库提供异步选项,我们应该改用它。

 

SpringBoot的WebCLient

幸运的是,Spring Boot 为我们提供了这样的WebClient类型:

 WebClient webClient = WebClient.create();
Mono<String> getHttpNonBlocking(String url) {
    return webClient
            .get()
            .uri(url)
            .exchangeToMono(cr -> cr.bodyToMono(String.class));
}

现在我们可以直接在控制器类中返回 this ,因为返回类型Mono<String>, 是相同的。运行它并执行查询,您应该会在日志中看到:

{"@timestamp":"2021-06-07T07:35:42.291-04:00","@version":"1",
"message":"querying google",
"logger_name":"net.kamradtfamily.blockingnono.Controller",
"thread_name":"reactor-http-nio-3",
"level":"INFO","level_value":20000}
{"@timestamp":"2021-06-07T07:35:42.998-04:00","@version":"1",
"message":"found content length 12995",
"logger_name":"net.kamradtfamily.blockingnono.Controller",
"thread_name":"reactor-http-nio-5",
"level":"INFO","level_value":20000}

现在您可以看到请求本身的开始是由线程处理的reactor-http-nio-3。但是上游请求的返回和原始请求的后续返回是由线程处理的reactor-http-nio-5。在等待处理上游请求的时间里,没有线程被浪费。

如果你使用Spring启动的WebFlux版本,你应该放弃使用RestTemplate而使用WebClient!

否则,您仍然在阻塞,并且您将无法从 WebFlux 中获得任何优势。

请注意,如果您正在使用 WebFlux 但想要使用 RxJava 库:我还没有找到 RxJava 版本的WebClient,因此您必须转换。Project Reactor 有一个专门用于此目的的适配器:

@GetMapping("/rxjavanonblocking")
public Single<String> getURLRxJavaNonBlocking() {
    log.info("querying google");
    return RxJava2Adapter.monoToSingle(
       getHttpNonBlocking("https://www.google.com")
            .doOnNext(s -> 
               log.info("found content length {}", s.length())));
}

其他库可能不提供异步方法,因此您将陷入使用“错误”方法处理阻塞方法作为分配给工作池的任务的困境。如果您正在处理一个新项目,如果它们提供异步库,这可能会影响您在数据库和消息队列等方面的决定。

 

猜你喜欢