使用反应式编程替换Java自动资源管理 - Arvind

21-11-19 banq

自动资源管理(Automatic resource management 简称ARM)在 Java 7 中首次引入时是一个受欢迎的特性,也就是通常说的无需finally的try()用法。

然后ARM 继续以意想不到的方式污染代码。这样做的一个重要原因是try块的结尾代表了需要释放资源的时刻,该块的结束也代表异常处理程序、局部变量作用域等的结束。

因此,虽然 ARM 确保可靠地释放事物,但它在资源有资格被释放时失去了一些控制权. 试图改变发布的时间会干扰整个代码流。

以下是 ARM 如何引入非预期代码块的示例。

try (val client = HttpClientBuilder.create().build()) {
    HttpGet request = new HttpGet(urlPath);
    request.setHeader("content-type", "application/json");
    try (val httpResponse = client.execute(request)) {
        if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
            String entityString = EntityUtils.toString(httpResponse.getEntity());
            PaginatedResponse<City> siloResponse =
                    gson.fromJson(entityString, new TypeToken<PaginatedResponse<City>>(){}.getType());
            cities.addAll(siloResponse.getResults());
            if (siloResponse.getNext() != null) {
                recursiveRead(cities, siloResponse.getNext());
            } else {
                log.info("Fetched:", cities.size());
            }
        } else {
            log.error("Failed to get Cities");
        }
    }
}

请注意,对象client和httpResponse对象都在 A​​RM 下。这引入的缩进和变量作用域实际上是一种眼中钉并损害了易用性。这让我想到了下一点……

 

有一个非常微妙的性能错误

注意以下几点

  • 页面的遍历使用递归实现
  • 仅当方法本身完成时,托管资源如client和httpResponse才有资格被释放

这意味着如果一个调用跨越 200 个页面,那么有 200 个client对象同时处于活动状态,并且在整个结果集被迭代之前它们没有资格被释放。事实证明,这个模块嵌入在一个微服务中,可以扩展到超过一百个实例。

所以,如果有一个查询确实返回了 200 页的结果,这段代码就会在远程服务器上创建一个 200 倍的网络放大攻击!

即使在命令式世界中,也可以直接解决此问题。困难的部分是首先知道存在问题。那么这在反应式世界中是如何实现的呢?

Mono<PaginatedResponse<City>> siloResponse = WebClient.create().get()
    .uri(urlPath)
    .accept(MediaType.APPLICATION_JSON)
    .retrieve()
    .bodyToMono(new ParameterizedTypeReference<PaginatedResponse<City>>() {
    });

这是使用流畅的功能风格不仅仅是语法糖的部分。Spring webclient 的创建者做了一些聪明的事情,其中bodyToMono充当隐式信号,表示您已完成与该 HTTP 请求相关的所有网络交互,并且我们已完成根据需要提取响应。

代码:https://github.com/anomalizer/rx-stream-blog

 

取消检查异常和异常类型

多年来,人们普遍认为 Java 中最大的语言设计错误之一是检查异常的概念。

几十年前,错误在软件世界中很少见。原因是多方面的,与本次讨论无关。相关的是,当时的程序员在执行理论上可能出错的操作时不会检查潜在的错误。随着时间的推移,程序遇到错误的几率增加,并且选择忽略错误导致难以在应用程序级别调试问题。Java 的第一个版本是在那个时代设计的,因此,语言设计者希望提供一种能力,其中任何库/模块的创建者都能够传达预期的错误,并且这些库/模块的调用者更好地准备处理他们。检查异常是强制这种行为的好方法。

当前围绕检查异常的遗憾反映了当前时代的问题。首先,我们生活在一个“万事皆失败”的时代。其次,任何现代应用程序堆栈都包含来自多个开源项目、多个商业供应商以及可能来自多个内部团队的库。最终应用程序层中的错误处理逻辑通常仅限于简单区分成功条件与出现问题的情况。很少用一种代码来破译异常类型、解开其中的字段等,并且对错误进行复杂的条件处理。

Java 中的函数式编程不允许存在已检查的异常,而反应式编程在某种程度上鼓励将编码模式限制为 aonSuccess()和onError()处理程序。这在异步处理的世界中成为绝对必须的,因为异常会发生在调用堆栈(即线程)上,而调用堆栈(即线程)与正在使用结果的调用堆栈完全不同。在检查很久以前完成的操作的结果时引发异常会非常奇怪。

 

优雅的取消协作:杀手级功能

微服务世界中使用的一种近乎通用的构造是能够“超时”未完成的操作并继续前进。这些超时是跨进程内库边界和进程间边界实现的。我们将通过考虑分页需要遍历 100 页才能产生最终结果并且“调用者”在遍历 2 页后宣布超时的情况来过度夸大超时模式。

至少需要两个线程才能在命令式方面发生这种情况。第一个线程是运行库的给定调用的地方,第二个线程是运行调用者的地方;等待计时器关闭或返回结果。当计时器响起时,呼叫者继续做其他事情。如果您想知道如何实现这一点,请考虑Future. 然而,其中fetchCity() 方法正在执行的第一个线程没有注意到调用者已经离开的事实。相反,它将继续遍历剩余的 98 页,组合结果并返回。

反应式版本隐式地理解超时并在尽可能早的时间停止未来的处理。这可能根本不明显,所以让我解释一下这是如何发生的。为此,我们需要考虑触发超时时代码可能处于的所有可能的执行状态

Mono<PaginatedResponse<City>> siloResponse = WebClient.create().get()
        .uri(urlPath)
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(new ParameterizedTypeReference<PaginatedResponse<City>>() {
        });
Flux<City> currentPage = siloResponse.map(PaginatedResponse::getResults).flatMapMany(Flux::fromIterable);
Flux<City> nextPage = siloResponse.map(PaginatedResponse::getNext)
        .filter(x -> !Strings.isNullOrEmpty(x))
        .flatMapMany(ReactiveApproach::recursiveRead2);
return Flux.concat(currentPage, nextPage);

取消流程需要从最后一行开始向后处理,我们将这样做。

  • concat 方法将根据它接下来要处理的一个currentPage或一个发出取消nextPage
  • 如果currentPage正在执行,它将向siloResponseI/O 调用或map/flayMayMap调用发出取消,具体取决于它是哪个阶段
  • 如果nextPage正在执行,则它正在等待递归调用的结果,即flatMapMany(ReactiveApproach::recursiveRead2)。取消被简单地推入这个嵌套调用中。取消的处理随后在嵌套调用中遵循相同的整体流程。
  • 如果siloResponse正在执行,Spring Boot 的WebClient(siloResponse 对象)将中止底层 HTTP 请求/响应处理。

之所以所有这一切都是可能的,是因为反应模型是基于拉动的模型。调用者可以通过各种机制发出信号,表明它不再对结果感兴趣。设置超时就是这样一种机制。一旦发出这个“不感兴趣”的信号,反应器就会知道结果没有消费者/订阅者,因此它会停止整个流程。

试图在命令式模型中完成这样的事情是非常具有侵入性的。有两种方法可以解决这个问题:使用线程中断或显式传递取消消息持有者对象。然后必须if(notCancelled/notInterrupted){}在每个阶段对代码进行检查,以防止浪费计算。

 

1
猜你喜欢