反应性和非反应性代码的分离 - DZone


避免在使用 Project Reactor 时因混合反应性和非反应性逻辑而导致的意外行为。
在使用 Project Reactor 或任何其他反应式流实现时要记住的最重要区别之一是代码执行中组装assembly时间与订阅时间之间的区别:
换句话说,反应式发布者(Flux和Mono)是惰性的,因此在有人订阅之前不会发布或处理任何元素。
了解这种区别至关重要,因为在编写实际应用程序时,我们希望所有(或大部分)业务逻辑在订阅时执行。在这篇文章中,我们将展示不遵守此规则会出现什么样的问题以及如何缓解这些问题。
 
租车服务示例
为了举例说明这一点,我们将使用一个非常简单的虚拟汽车租赁服务实现。该服务接受包含客户姓名、年龄和电子邮件地址以及汽车型号的输入。它首先检查客户是否年满 18 岁(因此在法律上允许租车),然后将租赁请求保存到数据库中,最后生成 PDF 收据并将其通过电子邮件发送给客户。
该流程由以下rentCar方法实现:

private static Mono<UUID> rentCar(CarRentalRequest request) {
    if (request.getCustomerAge() > 18) {
        UUID rentalId = UUID.randomUUID(); // Generate an ID for the new rental
        return saveCarRental(rentalId, request)
// Save the rental entity to the database
            .then(buildAndSendPdfReceipt(rentalId, request))
// Generate and send PDF report
            .then(Mono.just(rentalId));
// Return the ID of the new rental
    } else {
        return Mono.error(new RuntimeException(
"Must be 18 to rent a car"));
    }
}

private static Mono<Void> buildAndSendPdfReceipt(UUID rentalId, CarRentalRequest carRentalRequest) {
    byte[] pdfReceipt = buildPdfReceipt(rentalId, carRentalRequest);
    return sendPdfReceipt(pdfReceipt, carRentalRequest.getCustomerEmail());
}

然后我们可以调用这个方法来创建发布者。此外,我们希望确保将工作委托给单独的调度程序,以便主线程可以继续处理其他请求。我们可以使用subscribeOn操作符来实现这一点(它改变了整个管道的执行上下文,包括上文和下文,因此顶级发布者将通过Scheduler生成集合上的元素)。最后,我们提供了一个订阅者,它定义了成功和错误响应时要执行的逻辑(subscribe()分别是方法中的两个 lambda 参数)。
下面我们提供了一个订阅者,它定义了成功和错误响应时要执行的逻辑(subscribe()分别是方法中的两个 lambda 参数)。

CarRentalRequest request = new CarRentalRequest("Alice", 30, "Hyundai i30", "alice@mail.com");

rentCar(request)
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe(s -> log.info(
"Car rented successfully, rental ID: {}", s), 
        e -> log.error(
"Could not rent car: {}", e.getMessage(), e));

 
陷阱 1:不正确的执行上下文
通过仔细观察第一段代码中buildAndSendPdfReceipt方法,人们很容易猜到buildPdfReceipt是一种同步的、非响应式的方法:它不返回任何响应式类型。
但是,如果我们运行这个例子,我们会得到以下输出:

21:25:38.961 [main] INFO com.reactordemo.carrental.CarRentalService - Build PDF receipt
21:25:38.986 [boundedElastic-1] INFO com.reactordemo.carrental.CarRentalService - Car rented successfully, rental ID: d5b689dd-fa91-486c-b835-44bc2583d53a

如果我们注意显示每个语句的当前线程的日志部分(在方括号中),我们会注意到订阅者逻辑在方括号boundedElastic-1中的线程上正确执行 ;然而,创建PDF的工作似乎是在main线程上执行的!那么为什么会这样呢?
答案在于上述组装assembly和订阅之间的区别。
我们再来看看这个buildAndSendPdfReceipt方法:

private static Mono<Void> buildAndSendPdfReceipt(UUID rentalId, CarRentalRequest carRentalRequest) {
    byte[] pdfReceipt = buildPdfReceipt(rentalId, carRentalRequest);
    return sendPdfReceipt(pdfReceipt, carRentalRequest.getCustomerEmail());
}

执行此方法时,我们只需要组装反应式管道,即以声明方式定义要执行的步骤以创建 PDF 报告。在这个阶段,我们不应该做生成这个报告的实际工作,这只是在有人订阅这个发布者时才会发生。
不幸的是,这里的情况并非如此 :调用buildPdfReceipt是在这个方法的主体中进行的。这样做的非常不幸的后果之一是我们在上面看到的不正确的执行上下文:
整个管道在main线程上组装assembly,而发布的元素在boundedElastic调度程序上处理。
解决此问题的一种方法是使用fromCallable,以下方法:
private static Mono<Void> buildAndSendPdfReceipt(UUID rentalId, CarRentalRequest carRentalRequest) {
    return Mono.fromCallable(() -> buildPdfReceipt(rentalId, carRentalRequest))
            .flatMap(pdfReceipt -> sendPdfReceipt(pdfReceipt, carRentalRequest.getCustomerEmail()));

正如我们所知,发布者只会在有人订阅时(即在订阅时)开始生成元素,因此buildPdfReceipt现在在所需的调度程序上将调用作为整个管道的一部分进行。事实上,再次运行应用程序会产生以下结果:

21:54:49.955 [boundedElastic-1] INFO com.reactordemo.carrental.CarRentalService - Build PDF receipt
21:54:49.956 [boundedElastic-1] INFO com.reactordemo.carrental.CarRentalService - Car rented successfully, rental ID: a3bb873e-4943-407a-967f-9fa1c1d0d235

在许多复杂的现实生活应用程序中,这种问题很难发现。避免它们的一种好方法是确保反应式方法(即组装管道的方法,通常具有反应式返回类型)不直接调用非反应式方法。相反,它们应该只组装反应管道,优选地在单一的流利语句,以及非反应性的方法的所有呼叫应该从反应性运算符(内部进行fromCallable,fromRunnable,map,filter,等等)。
 
陷阱 2:不正确的异常处理
在设计和实现任何类型的应用程序时,我们总是希望通过尝试恢复或以其他方式向用户显示正确的错误消息来确保我们可以优雅地处理错误。在我们简单的汽车租赁服务中,我们创建了一个带有错误处理程序 lambda 的订阅者,用于记录上游的错误。预期管道中任何地方可能发生的任何错误都将导致描述问题的日志语句。
为了测试这一点,让我们考虑以下输入:

CarRentalRequest request = new CarRentalRequest("Bob", null, "Hyundai i30", "bob@mail.com")

请注意,在这种情况下,客户的年龄被错误地设置为null。即便如此,我们希望这可能导致的任何错误都将被正确拦截和记录。不幸的是,现在运行此代码会产生以下输出:

Exception in thread "main" java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "com.reactordemo.carrental.CarRentalService$CarRentalRequest.getCustomerAge()" is null
    at com.reactordemo.carrental.CarRentalService.rentCar(CarRentalService.java:27)
    at com.reactordemo.carrental.CarRentalService.entryPoint(CarRentalService.java:19)
    at com.reactordemo.carrental.ReactorDemoApplication.main(ReactorDemoApplication.java:10)

这表明我们的无效输入产生了一个在任何地方都没有被捕获的 NPE。但为什么?为什么没有为这个异常调用我们的错误处理程序?为了理解这一点,让我们再看看我们的主要反应管道:
private static Mono<UUID> rentCar(CarRentalRequest request) {
    if (request.getCustomerAge() > 18) {
        UUID rentalId = UUID.randomUUID();
        return saveCarRental(rentalId, request)
            .then(buildAndSendPdfReceipt(rentalId, request))
            .then(Mono.just(rentalId));
    } else {
        return Mono.error(new RuntimeException("Must be 18 to rent a car"));
    }
}

很明显异常发生在if语句的条件中,我们检查年龄是否大于 18。但请注意,作为管道执行的一部分,此检查不会正确发生。相反,检查是作为组装管道的一部分进行的。因此,这里发生的任何错误都不会被视为处理管道中的元素失败,而是组装管道失败。再一次,这个问题可以通过简单地定义所有特定于反应管道内元素处理(包括检查)的逻辑来避免。

private static Mono<UUID> rentCar(CarRentalRequest request) {
    return Mono.just(request)
        .<CarRentalRequest>handle((req, sink) -> {
            if (req.getCustomerAge() > 18) {
                sink.next(req);
            } else {
                sink.error(new RuntimeException("Must be 18 to rent a car"));
            }
        })
        .flatMap(req -> {
            UUID rentalId = UUID.randomUUID();
            return saveCarRental(rentalId, req)
                    .then(buildAndSendPdfReceipt(rentalId, req))
                    .then(Mono.just(rentalId));
        });
}

在最初的实现中,有两个功能与处理在组装时执行的请求相关:年龄检查和 ID 生成。我们现在已经将它们分别移到管道中,分别在handle和flatMap运算符中。应用此修复程序后,执行会产生以下输出:

12:48:46.627 [boundedElastic-1] ERROR com.reactordemo.carrental.CarRentalService - Could not rent car: Cannot invoke "java.lang.Integer.intValue()" because the return value of "com.reactordemo.carrental.CarRentalService$CarRentalRequest.getCustomerAge()" is null
java.lang.NullPointerException: Cannot invoke
"java.lang.Integer.intValue()" because the return value of "com.reactordemo.carrental.CarRentalService$CarRentalRequest.getCustomerAge()" is null
    at com.reactordemo.carrental.CarRentalService.lambda$rentCarFixed$2(CarRentalService.java:40)


当然,抛出 NPE 而不是验证输入并产生更有意义的错误是不理想的。尽管如此,我们仍然可以看到异常现在在管道内的订阅时抛出,这意味着它最终会被我们的错误处理程序捕获,正如预期的那样。
 
结论
在这篇博文中,我们分析了两种情况,其中不正确地分离汇编时间和订阅时间逻辑会导致我们的应用程序出现不良行为。为了减轻此类问题和其他问题,我们建议明确分离如下:

  • 作为反应式方法(组装反应式管道的方法,即具有反应式返回类型)的一部分,避免执行除严格构建管道之外的任务。
  • 此类方法的一个好的做法是确保它们只组装和返回反应式管道,最好在单个流利风格的语句中。
  • 任何重要的逻辑(通常与元素处理有关,而不是组装管道),例如输入验证或映射、对其他同步方法的调用等,都应作为管道的一部分执行。这可以使用大量运算符来实现,本文举例说明了其中的几个,例如handle, flatMap, fromCallable。