Java 9新特点: 响应式流Reactive Streams

Java 9将在2017七月发布,它将推出一系列新的和修订后的特点、方法和其他元素。在下面的文章我们将深入探讨JEP 266,看看它提供了哪些改进与新特点:

Reactive Streams(响应式流/反应流)
按照Reactive Manifesto宣言,Reactive目标是使用非阻塞背压方式提供一个标准的异步流处理”。目前主要的挑战不是找不到解决方案,而是要以最大化的互操作性集成不同的现有方案模式到共同的一个应用中。更确切地说,Reactive流目的是“找到最小的一组接口,方法和协议,用来描述必要的操作和实体以实现这样的目标:以非阻塞背压方式实现数据的异步流”。

“背压(反压)back pressure”概念很关键。首先异步消费者会向生产者订阅接收消息,然后当有新的信息可用时,消费者会通过之前订阅时提供的回调函数被再次激活调用。如果生产者发出的信息比消费者能够处理消息最大量还要多,消费者可能会被迫一直在抓消息,耗费越来越多的资源,埋下潜在的崩溃风险。为了防止这一点,需要有一种机制使消费者可以通知生产者,降低消息的生成速度。生产者可以采用多种策略来实现这一要求,这种机制称为背压。

阻塞式背压是容易实现的。例如,当生产者和消费者都是在同一个线程运行时,其中一个将阻止其他线程执行。这意味着,当消费者被执行时,生产者就不能发出任何新的信息,因此需要以自然的方式平衡输入和输出的发生。然而,在有些情况下,阻塞式背压会出现不良问题(例如当生产者有多个消费者,不是所有的人都能以同样的速度消费消息时)或根本达不到降压目的(例如当消费者和生产者在不同环境中运行)。在这些情况下,背压机制以非阻塞的方式工作就是很必要的。

实现非阻塞背压的方法是放弃推push的策略,其他生产者发送消息给消费者等这些都是可保留到拉pull的策略中,消费者会要求生产者生成多少消息量,而且只能最多发送这些量,然后一直等待到下次进一步的请求更多量。

JEP 266:更多的并发更新
在JDK 9 Flow API是对应Reactive Streams规范的,这个规范已经是一个事实上的标准。JEP266包含一个最小的套接口来捕获异步的发布和订阅的核心步骤。希望在未来第三方会实现他们自己且能共享的方式。

这些核心包括:

Publisher
产生用户消费的消息(消息生产者或发布者)。其只有唯一的方法是subscribe(Subscriber),其目的应该是显而易见的,用来订阅的。

Subscriber
订阅publishers(通常只有一个)并从中接收多个消息通过方法OnNext(T),错误消息通过onError(Throwable),如果只有一个消息,肯定不会没有更多的消息量使用oncomplete()。在这些任何的情况发生前,publisher需要调用onSubscription(Subscription)来激活它们。

Subscription
在单一publisher和单一subscriber之间的连接。subscriber会用它来请求更多的消息(request(long))或切断连接(cancel())

flow流程如下:

1. 创建一个Publisher和一个Subscriber
2. 使用Publisher::subscribe订阅Subscriber
3. Publisher创建Subscription 和调用Subscriber::onSubscription,这样Subscriber可以存储这个Subscription。
4. 在某个时候,subscriber 调用Subscription::request请求一批数量的消息。
5.publisher 开始传递消息给它的订阅者subscriber ,这是通过调用Subscriber::onNext实现的。它发布的消息量绝对不会超过请求所要求数量。
6.publisher发布的消息可能会某段时间后全部发布完毕或遇到麻烦,这时调用Subscriber::onCompleteor或Subscriber::onError
7.subscriber 可以在当前连接继续要求更多的消息,或取消连接Subscription::cancel

所有的这一切都是非常直接的,也许除了Subscription::request。为什么用户订阅者subscriber需要这样做?这其实已经在上面实施背压原理里说明了;)

值得注意的是,JDK只提供接口但没有实现(除submissionpublisher)!也没有指定用什么系统比如文件系统来为异步任务创建生产者发布者。所有描述的接口都是类Flow的内部类,具有良好的介绍文档。所以为了真正使用响应性的API,你应该使用某些实施方案:
Reactive Streams,
Project Reactor(将集成到Spring 5),
Akka Streams,
RxJava

Java 9 new Features: Reactive Streams