在看到Jurgen Hoeller引入新的Spring 5功能后,我终于开始尝试在尚未发布的Spring Boot 2.0.0 Snapshot中尝试新的Spring WebFlux项目。开始吧:
Maven WebFlux项目生成
- 转到Spring启动应用程序生成器
- 在Spring Boot版本中选择“2.0.0”以上版本
- 在依赖项中搜索“ Reactive Web ”
- 保存生成的maven项目
演示(反应端点)
在刚刚生成的Spring WebFlux项目中,让我们构建一个REST端点,以Reactive方式获取通用存储Item 。首先让我们开始:@RestController public class ItemsReactiveController {
@Autowired private IItemsService iItemsService;
public Flux<Item> findById(String id) { try { System.out.println("Getting the data from DB..."); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return Flux.just(iItemsService.findById(id)); }
@GetMapping(value = "/store/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public void getItemById(@PathVariable String id) { System.out.println("Controller start...."); findById(id).subscribe(new Subscriber<Item>() {
@Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Item t) { System.out.println("Retrieved: "+t.getName()); } @Override public void onError(Throwable t) { } @Override public void onComplete() { } }); System.out.println("End of method."); } }
|
代码详细
方法findById返回Flux <Item>类型。Flux是一种以反应方式返回0..N项的数据类型。另外一种可能性是使用返回0或1项的Mono<Item>类型。
Jurgen Hoeller清楚地提到如果端点返回上述数据类型之一,那么实际上没有返回结果,但是Spring给调用者一个管道,其中结果将最终落地。正如您从NodeJS所知,但是在Spring方式中,引擎盖下的机制非常接近EventLoop。要从Reactive端点获取任何数据,您需要订阅返回的管道。如果您想获得结果的回调,那么管道上的订阅阶段或错误就会被使用来自反应流的订阅者,这是Project reactor的底层实现。第一次测试:
项目源码:https://bitbucket.org/tomask79/spring-reactive-rest.git
让我们用Oracle Store中现有Item的{id}调用先前创建的端点(我不会厌烦使用的JPA配置,它不是演示的主题)。点击浏览器:http://localhost:8081/store/{id}
系统输出:
Controller start.... Getting the data from DB... [EL Fine]: sql: 2017-03-20 14:19:56.321--ServerSession(26882836)--Connection(29914401)--SELECT ID, ITEM_NAME FROM ITEMS WHERE (ID = ?) bind => [1 parameter bound] Retrieved: <Name of the Item> End of method.
|
如您所见,代码输出每个步骤:
- 调用控制器(Controller start ....)
- 获取item调用服务(Retrieved: <Name of the Item>)
- 达到了方法的结束。(End of method)
默认情况下,从主线程获取订阅数据,当然因为我使用的数据存储不提供反应式驱动程序(Oracle),因此调用存储是阻塞的。目前,支持反应式编程(解锁通话)的存储是:
- Mongo
- Redis
- Cassandra
- Postgres
当然,激活检查新项目Spring Data“Kay”,才能在上面提到的Spring Data项目中启用反应范例。要实际启用异步发布我们的项目,我们需要将控制器更改为:
package com.example.controller;
import com.example.domain.Item; import com.example.service.IItemsService; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers;
/** * Created by Tomas.Kloucek on 17.3.2017. */ @RestController public class ItemsReactiveController {
@Autowired private IItemsService iItemsService;
public Flux<Item> findById(String id) { try { System.out.println("Getting the data from DB..."); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return Flux.just(iItemsService.findById(id)).publishOn(Schedulers.parallel()); }
@GetMapping(value = "/store/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public void getItemById(@PathVariable String id) { System.out.println("Controller start...."); findById(id).subscribe(v -> { System.out.println("Consumed: "+v.getId()); }); System.out.println("End of method."); } }
|
- 改变了订阅只获得结果。如果你知道RxJava你应该熟悉。
- 添加了publishOn方法,并调度了用于异步发布Item的线程。
现在,如果我们从浏览器再次点击端点,输出将是:Controller start.... Getting the data from DB... [EL Fine]: sql: 2017-03-20 14:16:49.69--ServerSession(18245293)--Connection(9048111)--SELECT ID, ITEM_NAME FROM ITEMS WHERE (ID = ?) bind => [1 parameter bound] End of method. Consumed: <ID of your Item entity>
|
正如您所看到的,Spring在给出订阅请求数据之前就已到达方法的最后(End of method)。
如何创建客户端以调用Reactive Endpoint
让我们用代码创建另一个Spring Boot WebReactive应用程序:
@SpringBootApplication public class DemoApplication {
@Bean public WebClient webClient() { return WebClient.create("http://<reactiveAppHost>:<reactiveAppPort>"); }
@Bean CommandLineRunner launch(WebClient webClient) { return args -> { webClient.get().Yuri("/store/{id}") .accept(MediaType.TEXT_EVENT_STREAM) .exchange() .flatMap(cr -> cr.bodyToFlux(Item.class)) .subscribe(v -> { System.out.println("Received from MS: "+v.getName()); }); }; }
public static void main(String args[]) { new SpringApplicationBuilder(DemoApplication.class).properties (Collections.singletonMap("server.port", "8082")) .run(args); } }
|
代码详细:
要调用Reactive端点,您需要首先获取WebClient实例。在演示案例中放入create方法http://localhost:8081.。由WebClient.exchange()方法调用执行的自调用方法,
但要实际在管道上放置订阅以获取结果,您需要调用ClientRequest.bodyToFlux(<ResultClassMapping> .class),这种订阅才是可能的。如果我们运行这个应用程序,那么结果应该是:
Started DemoApplication in 4.631 seconds (JVM running for 4.954) Received from MS: <Item text>
|
这部分客户端代码:
git clone https://tomask79@bitbucket.org/tomask79/spring-reactive-rest-client.git
用于异步调用REST端点的API是否必要?
我对这种订阅和异步调用端点的新反应趋势的观点是一种悲观。如果我需要异步调用端点,那么JMS或AMPQ是第一个让我进入大脑的想法,特别是在MicroServices中。但我们会看到这将如何发展。Spring Framework 5中的其他计划更改很有希望:
- 带有Angular的函数框架,类似Router
- 支持Project Jigsaw
- 注册beanLamdas。
本站文章点击标题看原文!