使用Reactor将阻塞调用变为异步非阻塞


Project Reactor 是一个完全无阻塞的基础库,包括背压支持。但在某些情况下,库包含复杂的阻塞方法而没有异步实现。在Reactor 流中调用此方法会产生不好的结果。

下面代码中我们使用了HttpsURLConnection,它实际是一个阻塞调用,

public String get(String url) throws IOException {
    HttpURLConnection connection = (HttpsURLConnection) new URL(url).openConnection();
    connection.setRequestMethod("GET");
    connection.setDoOutput(true);
    try(InputStream inputStream = connection.getInputStream()) {
        return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
    }
}

为此,我们需要一个Scheduler。对于阻塞调用,我们将使用Reactor的boundedElastic调度程序。调度程序也可以由现有的执行程序服务创建。
因此,让我们将上述方法转换为非阻塞方法:
package com.gkatzioura.blocking;
 
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
 
public class BlockingAsyncService {
 
    private final BlockingService blockingService;
 
    public BlockingAsyncService(BlockingService blockingService) {
        this.blockingService = blockingService;
    }
 
    private Mono<String> get(String url) {
        return Mono.fromCallable(() -> blockingService.get(url))
                .subscribeOn(Schedulers.boundedElastic());
    }
 
}

我们可以看到的是从可调用方法创建的 Mono。调度程序因为订阅此Mono,将接收发出的事件,该事件应被安排执行。
让我们做个测试:
package com.gkatzioura.blocking;
 
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
 
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
 
class BlockingAsyncServiceTest {
 
    private BlockingAsyncService blockingAsyncService;
 
    @BeforeEach
    void setUp() {
        blockingAsyncService = new BlockingAsyncService(new BlockingService());
    }
 
    @Test
    void name() {
        StepVerifier.create(
                            Mono.just("https://www.google.com/")
                                .map(s -> blockingAsyncService.get(s))
                                .flatMap(s -> s)
                    )
                .consumeNextWith(s -> s.startsWith(
"<!doctype"))
                .verifyComplete();
    }
}

最好的办法是找到一种方法将这种阻塞调用转化为异步调用,并尝试使用现有的异步库找到解决方法。当它不可行时,我们可以回退使用线程。