Spring WebFlux是一种创建非阻塞REST应用程序的好方法。但是,一般人开始使用WebFlux时都会遇到的一个问题就是JDBC连接池是一种阻塞方式,如何让JDBC连接池堵塞不会对整个请求的访问路径造成堵塞呢?
像Cassandra或Couchbase这样新数据库都提供了非阻塞的驱动程序。比如Couchbase的驱动程序就是使用RXJava。NoSQL数据库非常棒,但由于各种原因,实际中可能并不都是使用它们。而关系数据库在创建驱动程序异步方面还要付出一些努力,比如Oracle正在推出异步数据库驱动ADBA。不幸的是,这种异步数据库驱动真的还处于早期阶段。
但是现在情况下是,只要你想使用JVM和SQL数据库进行交互,你就会需要使用通常的阻塞JDBC驱动程序。幸运的是,如果使用Spring WebFlux构建非阻塞应用程序,那么reactor-core和rxjava2可以帮助你实现异步的反应式的ReactiveStreams。
这就意味着您可以使用David Moten的rxjava2-jdbc库,尽管它采取的是普通堵塞式的JDBC驱动程序,但是可以让我们的应用程序以不会堵塞的方式与数据库进行交互。它是通过在不同线程上调度背后的阻塞来实现的 此外,它还具有DSL,可以将SQL语句和结果建模为流Stream,这些整合都使得响应式编程变得更简单。
Rxjava2-jdbc还提供了非阻塞连接池,通常在阻塞应用程序中,如果线程被阻塞,会一直到有可用的连接才会继续。但是这对于非阻塞应用程序来说却是一个问题,因为非堵塞应用会启动新的数据库连接,这将很快耗尽所有数据库连接线程,并将美妙的非阻塞应用程序变为笨重的阻塞应用程序。在rxjava2-jdbc中,其连接池将返回由连接池控制的一个Single,这意味着,不是为每次查询分配一个线程,查询会订阅连接池,并在有可用时接收连接 - 同时不会阻塞应用程序的线程!
说了这么多,还是看看示例吧,下面我创建了一个使用Spring WebFlux,rxjava2-jdbc和H2的示例。该示例是使用嵌入式H2数据库的简单REST应用程序。它源码在这里:https://github.com/netifi/webflux-rxjava2-jdbc-example。
这个例子有两个对象:第一个是一个Employee对象,它包含关于员工的信息,包括:名字,姓氏和部门ID。第二个对象是Department对象,其中包含有关不同部门的信息。该应用程序具有restful的端点,用于部门的员工和列表操作的基本创建,读取,删除和列表操作。
运行示例
运行该示例很简单,可以使用以下命令完成:
./gradlew clean run
该命令将配置数据库并使用Spring Boot启动WebFlux应用程序。该示例不需要运行servlet容器,而是使用Netty。如果您想测试并查看示例是否已启动并正在运行,则可以运行以下curl命令:
curl localhost:8080 / employees
如果运行正常,您应该看到JSON员工列表。
路由请求
我们来看看应用程序,看看它在做什么,首先看一下EmployeeRouter类。该类根据URL和HTTP方法将传入的HTTP请求路由到适当的处理程序。
@Configuration public class EmployeeRouter { @Bean public RouterFunction<ServerResponse> route(EmployeeHandler handler) { return RouterFunctions.route( GET("/employees").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), handler::getAllEmployees) .andRoute( GET("/employee/fn/{fn}/ln/{ln}") .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), handler::getEmployee) .andRoute( PUT("/employee").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), handler::createNewEmployee) .andRoute( DELETE("/employee/id/{id}").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), handler::deleteEmployee) .andRoute( GET("/departments").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), handler::getAllDepartments); } }
|
在Spring WebFlux中,因为它是非阻塞的,所以这些处理程序必须返回一个Mono <T>。
处理请求
下面要看的类是处理请求的类EmployeeHandler。
@Component public class EmployeeHandler { private final EmployeeRepository repository;
public EmployeeHandler(EmployeeRepository repository) { this.repository = repository; }
public Mono<ServerResponse> getAllEmployees(ServerRequest request) { Flux<Employee> employees = repository.getAllEmployees(); return ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .body(employees, Employee.class); }
public Mono<ServerResponse> getEmployee(ServerRequest request) { String firstName = request.pathVariable("fn"); String lastName = request.pathVariable("ln"); Mono<Employee> employee = repository.getEmployee(firstName, lastName);
return ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .body(employee, Employee.class); }
public Mono<ServerResponse> createNewEmployee(ServerRequest request) { Mono<Employee> employeeMono = request.bodyToMono(Employee.class); Mono<Employee> employee = repository.createNewEmployee(employeeMono);
return ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .body(employee, Employee.class); }
public Mono<ServerResponse> deleteEmployee(ServerRequest request) { String id = request.pathVariable("id"); Mono<Void> employee = repository.deleteEmployee(id);
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).build(employee); }
public Mono<ServerResponse> getAllDepartments(ServerRequest request) { Flux<Department> allDepartments = repository.getAllDepartments(); return ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .body(allDepartments, Department.class); } }
|
这是负责处理传入请求的类。每个句柄方法返回一个Mono <ServerResponse>。ServerResponse对象具有一个用于创建响应的构建器,并且有一些方法可以使用Fluxes和Monos进行响应,该ServerResponse对象也将自动序列化你的结果。此外,如果想返回响应流,WebFlux会自动将响应转换为JSON列表。你可以看到这个在getAllEmployees和getAllDepoartment方法中实现了。更近一点的方法是deleteEmployee方法,该deleteEmployee方法与其他方法稍有不同。您会注意到大多数方法都使用.body方法来返回响应。该deleteEmployee却没有.body方法,它使用.build方法来返回响应,这实际用来返回Mono<Void>。
见下页