Spring团队开发了Project Reactor,以支持Spring生态系统中的反应式工作。您不需要Spring即可使用Project Reactor,但是Spring生态系统中的所有反应式API均基于Project Reactor来提供数据流选项。
让我们看一个示例,该示例说明Project Reactor如何简化不同数据流源和接收器的组合工作,并消除所有手动线程化代码。
您需要以下依赖项:
org.springframework.boot:spring-boot-starter-webflux
下面示例说明了在给定不同数据类型的情况下标准化处理的难易程度。在此示例中,我们比较一下Java 8的java.util.Stream<T?>和a CompletableFuture<T>,在大多数反应式应用程序中,您不必从事将非反应性类型转换为反应性类型(如Flux<T>或Mono<T>)的业务。这些例子将更加直接。本示例假定您有两个数据源,需要将它们组合在一起。
package bootiful.rx;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
@SpringBootApplication
public class BootifulApplication {
CompletableFuture<String> returnCompletableFuture(int counter) {
return CompletableFuture.supplyAsync(() -> {
var start = System.currentTimeMillis();
try {
Thread.sleep((long) (Math.max((Math.random() * 10), 5) * 1000));
}
catch (InterruptedException e) {
// threads smdh
}
var stop = System.currentTimeMillis();
var delta = stop - start;
return "(" + Thread.currentThread().getName() + ") Hello, #" + counter + "! (after " + delta + " ms.)";
});
}
Stream<Integer> returnStream() {
return Stream.iterate(0, integer -> integer + 1);
}
@Bean
ApplicationListener<ApplicationReadyEvent> begin() {
return event -> {
Flux<String> count = Flux//
.fromStream(this.returnStream()) //
.take(10) //
.flatMap(c -> Flux.zip(Mono.just(c), Mono.fromCompletionStage(this.returnCompletableFuture(c)))) //
.map(tuple -> tuple.getT2() + " #" + tuple.getT1()); //
count.subscribe(System.out::println);
};
}
public static void main(String[] args) {
SpringApplication.run(BootifulApplication.class, args);
}
}
|