Project Reactor 是一个完全无阻塞的基础库,包括背压支持。但在某些情况下,库包含复杂的阻塞方法而没有异步实现。在Reactor 流中调用此方法会产生不好的结果。
下面代码中我们使用了HttpsURLConnection,它实际是一个阻塞调用,
public String get(String url) throws IOException { HttpURLConnection connection = (HttpsURLConnection) new URL(url).openConnection(); connection.setRequestMethod("GET"); connection.setDoOutput(true); try(InputStream inputStream = connection.getInputStream()) { return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); } }
|
为此,我们需要一个Scheduler。对于阻塞调用,我们将使用Reactor的boundedElastic调度程序。调度程序也可以由现有的执行程序服务创建。
因此,让我们将上述方法转换为非阻塞方法:
package com.gkatzioura.blocking; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class BlockingAsyncService { private final BlockingService blockingService; public BlockingAsyncService(BlockingService blockingService) { this.blockingService = blockingService; } private Mono<String> get(String url) { return Mono.fromCallable(() -> blockingService.get(url)) .subscribeOn(Schedulers.boundedElastic()); } }
|
我们可以看到的是从可调用方法创建的 Mono。调度程序因为订阅此Mono,将接收发出的事件,该事件应被安排执行。
让我们做个测试:
package com.gkatzioura.blocking; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; class BlockingAsyncServiceTest { private BlockingAsyncService blockingAsyncService; @BeforeEach void setUp() { blockingAsyncService = new BlockingAsyncService(new BlockingService()); } @Test void name() { StepVerifier.create( Mono.just("https://www.google.com/") .map(s -> blockingAsyncService.get(s)) .flatMap(s -> s) ) .consumeNextWith(s -> s.startsWith("<!doctype")) .verifyComplete(); } }
|
最好的办法是找到一种方法将这种阻塞调用转化为异步调用,并尝试使用现有的异步库找到解决方法。当它不可行时,我们可以回退使用线程。