Spring WebFlux 和java2-jdbc结合案例

Spring WebFlux是一种创建非阻塞REST应用程序的好方法。但是,一般人开始使用WebFlux时都会遇到的一个问题就是JDBC连接池是一种阻塞方式,如何让JDBC连接池堵塞不会对整个请求的访问路径造成堵塞呢?

像Cassandra或Couchbase这样新数据库都提供了非阻塞的驱动程序。比如Couchbase的驱动程序就是使用RXJava。NoSQL数据库非常棒,但由于各种原因,实际中可能并不都是使用它们。而关系数据库在创建驱动程序异步方面还要付出一些努力,比如Oracle正在推出异步数据库驱动ADBA。不幸的是,这种异步数据库驱动真的还处于早期阶段。

但是现在情况下是,只要你想使用JVM和SQL数据库进行交互,你就会需要使用通常的阻塞JDBC驱动程序。幸运的是,如果使用Spring WebFlux构建非阻塞应用程序,那么reactor-core和rxjava2可以帮助你实现异步的反应式的ReactiveStreams。

这就意味着您可以使用David Moten的rxjava2-jdbc库,尽管它采取的是普通堵塞式的JDBC驱动程序,但是可以让我们的应用程序以不会堵塞的方式与数据库进行交互。它是通过在不同线程上调度背后的阻塞来实现的 此外,它还具有DSL,可以将SQL语句和结果建模为流Stream,这些整合都使得响应式编程变得更简单。

Rxjava2-jdbc还提供了非阻塞连接池,通常在阻塞应用程序中,如果线程被阻塞,会一直到有可用的连接才会继续。但是这对于非阻塞应用程序来说却是一个问题,因为非堵塞应用会启动新的数据库连接,这将很快耗尽所有数据库连接线程,并将美妙的非阻塞应用程序变为笨重的阻塞应用程序。在rxjava2-jdbc中,其连接池将返回由连接池控制的一个Single,这意味着,不是为每次查询分配一个线程,查询会订阅连接池,并在有可用时接收连接 - 同时不会阻塞应用程序的线程!

说了这么多,还是看看示例吧,下面我创建了一个使用Spring WebFlux,rxjava2-jdbc和H2的示例。该示例是使用嵌入式H2数据库的简单REST应用程序。它源码在这里:https://github.com/netifi/webflux-rxjava2-jdbc-example

这个例子有两个对象:第一个是一个Employee对象,它包含关于员工的信息,包括:名字,姓氏和部门ID。第二个对象是Department对象,其中包含有关不同部门的信息。该应用程序具有restful的端点,用于部门的员工和列表操作的基本创建,读取,删除和列表操作。

运行示例
运行该示例很简单,可以使用以下命令完成:

./gradlew clean run

该命令将配置数据库并使用Spring Boot启动WebFlux应用程序。该示例不需要运行servlet容器,而是使用Netty。如果您想测试并查看示例是否已启动并正在运行,则可以运行以下curl命令:

curl localhost:8080 / employees

如果运行正常,您应该看到JSON员工列表。

路由请求
我们来看看应用程序,看看它在做什么,首先看一下EmployeeRouter类。该类根据URL和HTTP方法将传入的HTTP请求路由到适当的处理程序。


@Configuration
public class EmployeeRouter {
@Bean
public RouterFunction<ServerResponse> route(EmployeeHandler handler) {
return RouterFunctions.route(
GET("/employees").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::getAllEmployees)
.andRoute(
GET(
"/employee/fn/{fn}/ln/{ln}")
.and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::getEmployee)
.andRoute(
PUT(
"/employee").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::createNewEmployee)
.andRoute(
DELETE(
"/employee/id/{id}").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::deleteEmployee)
.andRoute(
GET(
"/departments").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::getAllDepartments);
}
}

在Spring WebFlux中,因为它是非阻塞的,所以这些处理程序必须返回一个Mono <T>。

处理请求
下面要看的类是处理请求的类EmployeeHandler。


@Component
public class EmployeeHandler {
private final EmployeeRepository repository;

public EmployeeHandler(EmployeeRepository repository) {
this.repository = repository;
}

public Mono<ServerResponse> getAllEmployees(ServerRequest request) {
Flux<Employee> employees = repository.getAllEmployees();
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(employees, Employee.class);
}

public Mono<ServerResponse> getEmployee(ServerRequest request) {
String firstName = request.pathVariable("fn");
String lastName = request.pathVariable(
"ln");
Mono<Employee> employee = repository.getEmployee(firstName, lastName);

return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(employee, Employee.class);
}

public Mono<ServerResponse> createNewEmployee(ServerRequest request) {
Mono<Employee> employeeMono = request.bodyToMono(Employee.class);
Mono<Employee> employee = repository.createNewEmployee(employeeMono);

return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(employee, Employee.class);
}

public Mono<ServerResponse> deleteEmployee(ServerRequest request) {
String id = request.pathVariable(
"id");
Mono<Void> employee = repository.deleteEmployee(id);

return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).build(employee);
}

public Mono<ServerResponse> getAllDepartments(ServerRequest request) {
Flux<Department> allDepartments = repository.getAllDepartments();
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(allDepartments, Department.class);
}
}

这是负责处理传入请求的类。每个句柄方法返回一个Mono <ServerResponse>。ServerResponse对象具有一个用于创建响应的构建器,并且有一些方法可以使用Fluxes和Monos进行响应,该ServerResponse对象也将自动序列化你的结果。此外,如果想返回响应流,WebFlux会自动将响应转换为JSON列表。你可以看到这个在getAllEmployees和getAllDepoartment方法中实现了。更近一点的方法是deleteEmployee方法,该deleteEmployee方法与其他方法稍有不同。您会注意到大多数方法都使用.body方法来返回响应。该deleteEmployee却没有.body方法,它使用.build方法来返回响应,这实际用来返回Mono<Void>。

见下页


与数据库交互
下面一步是实现从数据库中获取信息返回响应,负责从数据库获取信息的处理类是EmployeeRepository,它是被注入到前面的请求处理类中。通常情况下,数据库驱动方式是数据库堵塞方式。在这个例子中,即使我们使用了rxjava2-jdbc,虽然这确实将JDBC驱动程序转变为非阻塞驱动程序,并且排除了可能会阻塞事件循环的可能,但这并不像纯粹的非阻塞方法那样有效。不过,这对于除了吞吐量特别高的应用程序之外的其他应用程序也已经足够。让我们看看EmployeeRepository类:


@Component
public class EmployeeRepository {
private Database db;

public EmployeeRepository() throws Exception {
Connection connection = DriverManager.getConnection("jdbc:h2:./build/mydatabase", "sa", "sa");
NonBlockingConnectionPool pool =
Pools.nonBlocking()
.maxPoolSize(Runtime.getRuntime().availableProcessors() * 5)
.connectionProvider(ConnectionProvider.from(connection))
.build();

this.db = Database.from(pool);
}

Flux<Employee> getAllEmployees() {
String sql =
"SELECT * FROM employee e JOIN department d ON e.department_id = d.department_id";

Flowable<Employee> employeeFlowable =
db.select(sql)
.get(
rs -> {
Employee employee = new Employee();
employee.setId(rs.getInt(
"employee_id"));
employee.setFirstName(rs.getString(
"employee_firstname"));
employee.setLastName(rs.getString(
"employee_lastname"));
employee.setDepartment(rs.getString(
"department_name"));

return employee;
});

return Flux.from(employeeFlowable);
}

Mono<Employee> getEmployee(String firstName, String lastName) {
String sql =
"SELECT employee_id, employee_firstname, employee_lastname, department_name FROM employee e "
+
"JOIN department d ON e.department_id = d.department_id "
+
"WHERE employee_firstname = ? AND "
+
"employee_lastname = ?";

Flowable<Employee> employeeFlowable =
db.select(sql)
.parameters(firstName, lastName)
.get(
rs -> {
Employee employee = new Employee();
employee.setId(rs.getInt(
"employee_id"));
employee.setFirstName(rs.getString(
"employee_firstname"));
employee.setLastName(rs.getString(
"employee_lastname"));
employee.setDepartment(rs.getString(
"department_name"));

return employee;
});

return Mono.from(employeeFlowable);
}

Mono<Employee> createNewEmployee(Mono<Employee> employeeMono) {

String createSql =
"INSERT INTO employee (employee_firstname, employee_lastname, department_id) VALUES (?, ?, ?)";
String selectDepartmentId =
"SELECT department_id from department where department_name = ?";
String selectSql =
"SELECT employee_id, employee_firstname, employee_lastname, department_name FROM employee e "
+
"JOIN department d ON e.department_id = d.department_id "
+
"WHERE employee_id = ?";

return employeeMono.flatMap(
newEmployee -> {
Flowable<Integer> employeeIds =
db.select(selectDepartmentId)
.parameters(newEmployee.getDepartment())
.getAs(Integer.class)
.flatMap(
departmentId ->
db.update(createSql)
.parameters(
newEmployee.getFirstName(),
newEmployee.getLastName(),
departmentId)
.returnGeneratedKeys()
.getAs(Integer.class));

Flowable<Employee> employeeFlowable =
db.select(selectSql)
.parameterStream(employeeIds)
.get(
rs -> {
Employee employee = new Employee();
employee.setId(rs.getInt(
"employee_id"));
employee.setFirstName(rs.getString(
"employee_firstname"));
employee.setLastName(rs.getString(
"employee_lastname"));
employee.setDepartment(rs.getString(
"department_name"));

return employee;
});

return Mono.from(employeeFlowable);
});
}

Mono<Void> deleteEmployee(String id) {
String sql =
"DELETE FROM employee WHERE employee_id = ?";
Flowable<Integer> counts = db.update(sql).parameter(id).counts();
return Flux.from(counts).then();
}

Flux<Department> getAllDepartments() {
return Flux.from(db.select(Department.class).get());
}
}

首先要看的是构造函数, rxjava2-jdbc中的主类是Database类。这是用来与数据库交互并运行查询的内容。当前之前需要创建一个数据库连接。在这个例子中创建了H2数据库连接。其他数据库能够在连接字符串中输入用户名和密码:

Database db = Database.from(“<connection_string”>,<pool_size>);

然而H2却不行,它需要创建一个Connection对象来传入证书。然后必须使用“池”构建器来创建池。一旦创建了池,你就可以创建数据库对象。

rxjava2-jdbc的好处是可以使用任何JDBC驱动程序。您可以切换任何标准JDBC数据库,如Oracle,SQL Server,DB2等。

这里不打算详细介绍rxjava2-jdbc是如何工作的,因为David Moten的github有很好的文档


Spring WebFlux and rxjava2-jdbc – Netifi – Medium