基于Vert.x和SpringBoot实现响应式开发

Vert.x是作为一个事件总线的设计,以保证应用中不同部分以一种非堵塞的线程安全方式通讯,其原理来自于Erlang和Akka,它是能充分利用多核处理器性能并实现高并发编程的需求。

所有Vert.x 的VERTICLE缺省是一个单线程,不像Node.js只有一个单线程,vert.x能在很多线程中运行很多VERTICLE,每个线程一个VERTICLE,这样你可以指定几个VERTICLE作为"worker",能够以多线程方式运行你的任务。

Vert.x可以通过使用Hazelcast实现底层的事件总线的多节点集群。

下面以一个案例说明如何基于 Spring Boot, Spring Data JPA, 和 Spring REST开发一个聊天室系统?

下面没有引入Vert.x而是使用传统Spting语法实现的系统:


@SpringBootApplication
@EnableJpaRepositories
@EnableTransactionManagement
@Slf4j
public class Application {
public static void main(String[] args) {
ApplicationContext ctx = SpringApplication.run(Application.class, args);

System.out.println("Let's inspect the beans provided by Spring Boot:");

String[] beanNames = ctx.getBeanDefinitionNames();
Arrays.sort(beanNames);
for (String beanName : beanNames) {
System.out.println(beanName);
}
}

@Bean
public DataSource dataSource() {
EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
return builder.setType(EmbeddedDatabaseType.HSQL).build();
}

@Bean
public EntityManagerFactory entityManagerFactory() {
HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
vendorAdapter.setGenerateDdl(true);

LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
factory.setJpaVendorAdapter(vendorAdapter);
factory.setPackagesToScan(
"com.zanclus.data.entities");
factory.setDataSource(dataSource());
factory.afterPropertiesSet();

return factory.getObject();
}

@Bean
public PlatformTransactionManager transactionManager(final EntityManagerFactory emf) {
final JpaTransactionManager txManager = new JpaTransactionManager();
txManager.setEntityManagerFactory(emf);
return txManager;
}
}

上述代码中 @Bean是提供了访问JPA的EntityManager, TransactionManager, 和 DataSource. 这是一个标准的Springboot案例源码。前端使用CustomerEnpoints作为RESTful的控制器,接受客户端的请求。

CustomerVerticle 类是作为@Component,意味着在启动时,Spring会初始化这个类,它也有@PostConstruct标注的start方法,这样Verticle 在启动被加载时会执行其方法内容:


@PostConstruct
public void start() throws Exception {
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.get("/v1/customer/:id")
.produces(
"application/json")
.blockingHandler(this::getCustomerById);
router.put(
"/v1/customer")
.consumes(
"application/json")
.produces(
"application/json")
.blockingHandler(this::addCustomer);
router.get(
"/v1/customer")
.produces(
"application/json")
.blockingHandler(this::getAllCustomers);
vertx.createHttpServer().requestHandler(router::accept).listen(8080);
}

在这个启动方法中,引入了vertx-web库包:Router,能够让用户定义将请求过滤转换为HTTP URLs, methods, 和头部header 等信息,BodyHandler 是能将POST/PUT提交的内容转变为一个JSON对象,Vert.x能够将其作为RoutingContext的一部分进行处理;RoutingContext包含Vert.x请求对象和响应对象和任何Http请求中数据,包括POST内容数据等等,blockingHandler是接受RoutingContext作为输入参数。

注意到blockingHandler方法使用了Java 8的方法引用,如this::getCustomerById,这比使用lambda将逻辑插入blockingHandler 中更具有代码可读性。getCustomerById的方法如下:


private void getCustomerById(RoutingContext rc) {
log.info("Request for single customer");
Long id = Long.parseLong(rc.request().getParam(
"id"));
try {
Customer customer = dao.findOne(id);
if (customer==null) {
rc.response().setStatusMessage(
"Not Found").setStatusCode(404).end("Not Found");
} else {
rc.response().setStatusMessage(
"OK").setStatusCode(200).end(mapper.writeValueAsString(dao.findOne(id)));
}
} catch (JsonProcessingException jpe) {
rc.response().setStatusMessage(
"Server Error").setStatusCode(500).end("Server Error");
log.error(
"Server error", jpe);
}
}

在getCustomerById方法中实现真正的业务处理,比如调用DAO访问数据库,序列化和反序列化对象,类似在MVC的控制器中实现一样。

完整见Github

这段代码的主要问题是性能的扩展性有限,这段代码要么可以在Tomcat中运行,要么以微服务方式在嵌入式服务器如Jetty或undertow中运行,总之,只能是一个请求捆绑一个线程,当在等待I/O等堵塞操作时,所有的资源都会消耗在等待上,这种局部堵塞引发整体等待是一种资源浪费。

而如果我们引入了Vert.x,使用@Bean注入了Vertx实例:


public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

private Vertx vertx;

/**
* Create an {@link ObjectMapper} for use in (de)serializing objects to/from JSON
* @return An instance of {@link ObjectMapper}
*/

@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper(new JsonFactory());
}

/**
* A singleton instance of {@link Vertx} which is used throughout the application
* @return An instance of {@link Vertx}
*/

@Bean
public Vertx getVertxInstance() {
if (this.vertx==null) {
this.vertx = Vertx.vertx();
}
return this.vertx;
}
....

访问JPA的代码基本没有变,主要是RESTful控制器更换了,使用CustomerVerticle替代了CustomerEnpoints。

完整代码见:Convert-To-Vert.x-Web

你可能觉得这比原来Spring代码更复杂了些,这里只是介绍案例源码,真正实战中可以使用Vert.x的元注解库实现类似JAX-RS的REST端点方式,当然最主要是我们获得了更好的可伸缩扩展性,在这段代码底层下面,Vertx使用了Netty作为异步IO操作,这样就可以处理更多并发请求,当然限制于数据库连接池的大小。

上面展示了将前端RESTful的IO从传统的同步模式如Jetty转换到异步IO入Netty方式,我们也可以将访问后端数据库的同步IO放入 Worker Verticles,这样就会更有效率地处理来自客户端的请求,代码可见:Convert-To-Worker-Verticles

原文参考:
Reactive Development Using Vert.x - Java Advent