Spring WebFlux和Reactive编程


在看到Jurgen Hoeller引入新的Spring 5功能后,我终于开始尝试在尚未发布的Spring Boot 2.0.0 Snapshot中尝试新的Spring WebFlux项目。开始吧:

Maven WebFlux项目生成

  • 转到Spring启动应用程序生成器
  • 在Spring Boot版本中选择“2.0.0”以上版本
  • 在依赖项中搜索“ Reactive Web ”
  • 保存生成的maven项目

演示(反应端点)
在刚刚生成的Spring WebFlux项目中,让我们构建一个REST端点,以Reactive方式获取通用存储Item 。首先让我们开始:

@RestController
public class ItemsReactiveController {

    @Autowired
    private IItemsService iItemsService;

    public Flux<Item> findById(String id) {
        try {
            System.out.println("Getting the data from DB...");
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Flux.just(iItemsService.findById(id));
    }

    @GetMapping(value =
"/store/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public void getItemById(@PathVariable String id) {
        System.out.println(
"Controller start....");
        findById(id).subscribe(new Subscriber<Item>() {

            @Override
            public void onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Item t) {
                System.out.println(
"Retrieved: "+t.getName());
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onComplete() {
            }
        });
        System.out.println(
"End of method.");
    }
}

代码详细
方法findById返回Flux <Item>类型。Flux是一种以反应方式返回0..N项的数据类型。另外一种可能性是使用返回0或1项的Mono<Item>类型。
Jurgen Hoeller清楚地提到如果端点返回上述数据类型之一,那么实际上没有返回结果,但是Spring给调用者一个管道,其中结果将最终落地。正如您从NodeJS所知,但是在Spring方式中,引擎盖下的机制非常接近EventLoop。要从Reactive端点获取任何数据,您需要订阅返回的管道。如果您想获得结果的回调,那么管道上的订阅阶段或错误就会被使用来自反应流的订阅者,这是Project reactor的底层实现。

第一次测试:
项目源码:https://bitbucket.org/tomask79/spring-reactive-rest.git
让我们用Oracle Store中现有Item的{id}调用先前创建的端点(我不会厌烦使用的JPA配置,它不是演示的主题)。点击浏览器:http://localhost:8081/store/{id}

系统输出:

Controller start....
Getting the data from DB...
[EL Fine]: sql: 2017-03-20 14:19:56.321--ServerSession(26882836)--Connection(29914401)--SELECT ID, ITEM_NAME FROM ITEMS WHERE (ID = ?)
        bind => [1 parameter bound]
Retrieved: <Name of the Item>
End of method.

如您所见,代码输出每个步骤:

  • 调用控制器(Controller start ....)
  • 获取item调用服务(Retrieved: <Name of the Item>)
  • 达到了方法的结束。(End of method)

默认情况下,从主线程获取订阅数据,当然因为我使用的数据存储不提供反应式驱动程序(Oracle),因此调用存储是阻塞的。目前,支持反应式编程(解锁通话)的存储是:
  • Mongo
  • Redis
  • Cassandra
  • Postgres

当然,激活检查新项目Spring Data“Kay”,才能在上面提到的Spring Data项目中启用反应范例。

要实际启用异步发布我们的项目,我们需要将控制器更改为:

package com.example.controller;

import com.example.domain.Item;
import com.example.service.IItemsService;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
 * Created by Tomas.Kloucek on 17.3.2017.
 */

@RestController
public class ItemsReactiveController {

    @Autowired
    private IItemsService iItemsService;

    public Flux<Item> findById(String id) {
        try {
            System.out.println(
"Getting the data from DB...");
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Flux.just(iItemsService.findById(id)).publishOn(Schedulers.parallel());
    }

    @GetMapping(value =
"/store/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public void getItemById(@PathVariable String id) {
        System.out.println(
"Controller start....");
        findById(id).subscribe(v -> {
           System.out.println(
"Consumed: "+v.getId());
        });
        System.out.println(
"End of method.");
    }
}

  • 改变了订阅只获得结果。如果你知道RxJava你应该熟悉。
  • 添加了publishOn方法,并调度了用于异步发布Item的线程。

现在,如果我们从浏览器再次点击端点,输出将是:

Controller start....
Getting the data from DB...
[EL Fine]: sql: 2017-03-20 14:16:49.69--ServerSession(18245293)--Connection(9048111)--SELECT ID, ITEM_NAME FROM ITEMS WHERE (ID = ?)
        bind => [1 parameter bound]
End of method.
Consumed: <ID of your Item entity>

正如您所看到的,Spring在给出订阅请求数据之前就已到达方法的最后(End of method)。

如何创建客户端以调用Reactive Endpoint
让我们用代码创建另一个Spring Boot WebReactive应用程序:

@SpringBootApplication
public class DemoApplication {

    @Bean
    public WebClient webClient() {
        return WebClient.create("http://<reactiveAppHost>:<reactiveAppPort>");
    }

    @Bean
    CommandLineRunner launch(WebClient webClient) {
        return args -> {
            webClient.get().Yuri(
"/store/{id}")
                    .accept(MediaType.TEXT_EVENT_STREAM)
                    .exchange()
                    .flatMap(cr -> cr.bodyToFlux(Item.class))
                    .subscribe(v -> {
                        System.out.println(
"Received from MS: "+v.getName());
                    });
        };
    }

    public static void main(String args[]) {
        new SpringApplicationBuilder(DemoApplication.class).properties
                (Collections.singletonMap(
"server.port", "8082"))
                .run(args);
    }
}

代码详细:
要调用Reactive端点,您需要首先获取WebClient实例。在演示案例中放入create方法http://localhost:8081.。由WebClient.exchange()方法调用执行的自调用方法,
但要实际在管道上放置订阅以获取结果,您需要调用ClientRequest.bodyToFlux(<ResultClassMapping> .class),这种订阅才是可能的。如果我们运行这个应用程序,那么结果应该是:

Started DemoApplication in 4.631 seconds (JVM running for 4.954)
Received from MS: <Item text>

这部分客户端代码:
git clone https://tomask79@bitbucket.org/tomask79/spring-reactive-rest-client.git

用于异步调用REST端点的API是否必要?
我对这种订阅和异步调用端点的新反应趋势的观点是一种悲观。如果我需要异步调用端点,那么JMS或AMPQ是第一个让我进入大脑的想法,特别是在MicroServices中。但我们会看到这将如何发展。Spring Framework 5中的其他计划更改很有希望:

  • 带有Angular的函数框架,类似Router
  • 支持Project Jigsaw
  • 注册beanLamdas。 

​​​​​​​本站文章点击标题看原文!