基于Vert.x和SpringBoot实现响应式开发
16-01-06
banq
Vert.x是作为一个事件总线的设计,以保证应用中不同部分以一种非堵塞的线程安全方式通讯,其原理来自于Erlang和Akka,它是能充分利用多核处理器性能并实现高并发编程的需求。
所有Vert.x 的VERTICLE缺省是一个单线程,不像Node.js只有一个单线程,vert.x能在很多线程中运行很多VERTICLE,每个线程一个VERTICLE,这样你可以指定几个VERTICLE作为"worker",能够以多线程方式运行你的任务。
Vert.x可以通过使用Hazelcast实现底层的事件总线的多节点集群。
下面以一个案例说明如何基于 Spring Boot, Spring Data JPA, 和 Spring REST开发一个聊天室系统?
下面没有引入Vert.x而是使用传统Spting语法实现的系统:
@SpringBootApplication @EnableJpaRepositories @EnableTransactionManagement @Slf4j public class Application { public static void main(String[] args) { ApplicationContext ctx = SpringApplication.run(Application.class, args); System.out.println("Let's inspect the beans provided by Spring Boot:"); String[] beanNames = ctx.getBeanDefinitionNames(); Arrays.sort(beanNames); for (String beanName : beanNames) { System.out.println(beanName); } } @Bean public DataSource dataSource() { EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder(); return builder.setType(EmbeddedDatabaseType.HSQL).build(); } @Bean public EntityManagerFactory entityManagerFactory() { HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter(); vendorAdapter.setGenerateDdl(true); LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean(); factory.setJpaVendorAdapter(vendorAdapter); factory.setPackagesToScan("com.zanclus.data.entities"); factory.setDataSource(dataSource()); factory.afterPropertiesSet(); return factory.getObject(); } @Bean public PlatformTransactionManager transactionManager(final EntityManagerFactory emf) { final JpaTransactionManager txManager = new JpaTransactionManager(); txManager.setEntityManagerFactory(emf); return txManager; } } <p class="indent"> |
上述代码中 @Bean是提供了访问JPA的EntityManager, TransactionManager, 和 DataSource. 这是一个标准的Springboot案例源码。前端使用CustomerEnpoints作为RESTful的控制器,接受客户端的请求。
CustomerVerticle 类是作为@Component,意味着在启动时,Spring会初始化这个类,它也有@PostConstruct标注的start方法,这样Verticle 在启动被加载时会执行其方法内容:
@PostConstruct public void start() throws Exception { Router router = Router.router(vertx); router.route().handler(BodyHandler.create()); router.get("/v1/customer/:id") .produces("application/json") .blockingHandler(this::getCustomerById); router.put("/v1/customer") .consumes("application/json") .produces("application/json") .blockingHandler(this::addCustomer); router.get("/v1/customer") .produces("application/json") .blockingHandler(this::getAllCustomers); vertx.createHttpServer().requestHandler(router::accept).listen(8080); } <p class="indent"> |
在这个启动方法中,引入了vertx-web库包:Router,能够让用户定义将请求过滤转换为HTTP URLs, methods, 和头部header 等信息,BodyHandler 是能将POST/PUT提交的内容转变为一个JSON对象,Vert.x能够将其作为RoutingContext的一部分进行处理;RoutingContext包含Vert.x请求对象和响应对象和任何Http请求中数据,包括POST内容数据等等,blockingHandler是接受RoutingContext作为输入参数。
注意到blockingHandler方法使用了Java 8的方法引用,如this::getCustomerById,这比使用lambda将逻辑插入blockingHandler 中更具有代码可读性。getCustomerById的方法如下:
private void getCustomerById(RoutingContext rc) { log.info("Request for single customer"); Long id = Long.parseLong(rc.request().getParam("id")); try { Customer customer = dao.findOne(id); if (customer==null) { rc.response().setStatusMessage("Not Found").setStatusCode(404).end("Not Found"); } else { rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(dao.findOne(id))); } } catch (JsonProcessingException jpe) { rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error"); log.error("Server error", jpe); } } <p class="indent"> |
在getCustomerById方法中实现真正的业务处理,比如调用DAO访问数据库,序列化和反序列化对象,类似在MVC的控制器中实现一样。
完整见Github
这段代码的主要问题是性能的扩展性有限,这段代码要么可以在Tomcat中运行,要么以微服务方式在嵌入式服务器如Jetty或undertow中运行,总之,只能是一个请求捆绑一个线程,当在等待I/O等堵塞操作时,所有的资源都会消耗在等待上,这种局部堵塞引发整体等待是一种资源浪费。
而如果我们引入了Vert.x,使用@Bean注入了Vertx实例:
public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } private Vertx vertx; /** * Create an {@link ObjectMapper} for use in (de)serializing objects to/from JSON * @return An instance of {@link ObjectMapper} */ @Bean public ObjectMapper objectMapper() { return new ObjectMapper(new JsonFactory()); } /** * A singleton instance of {@link Vertx} which is used throughout the application * @return An instance of {@link Vertx} */ @Bean public Vertx getVertxInstance() { if (this.vertx==null) { this.vertx = Vertx.vertx(); } return this.vertx; } .... <p class="indent"> |
访问JPA的代码基本没有变,主要是RESTful控制器更换了,使用CustomerVerticle替代了CustomerEnpoints。
完整代码见:Convert-To-Vert.x-Web
你可能觉得这比原来Spring代码更复杂了些,这里只是介绍案例源码,真正实战中可以使用Vert.x的元注解库实现类似JAX-RS的REST端点方式,当然最主要是我们获得了更好的可伸缩扩展性,在这段代码底层下面,Vertx使用了Netty作为异步IO操作,这样就可以处理更多并发请求,当然限制于数据库连接池的大小。
上面展示了将前端RESTful的IO从传统的同步模式如Jetty转换到异步IO入Netty方式,我们也可以将访问后端数据库的同步IO放入 Worker Verticles,这样就会更有效率地处理来自客户端的请求,代码可见:Convert-To-Worker-Verticles
原文参考: