什么是函数响应式编程(Functional Reactive Programming:FRP)

函数响应式编程(Functional Reactive Programming:FRP)是一种和事件流有关的编程方式,其角度类似EventSoucing,关注导致状态值改变的行为事件,一系列事件组成了事件流。

FRP是更加有效率地处理事件流,而无需显式去管理状态。

具体来说,FRP包括两个核心观点:
1.事件流,离散事件序列
2.属性properties, 代表模型连续的值。

一系列事件是导致属性值发生变化的原因。FRP非常类似于GOF的观察者模式。


为什么需要FRP?
FRP的需求来源于对于多个值发生改变,以javascript为例子,如下:
var a = function (b,c) { return b + c } // a = b + c
其中a实际代表b与c之和,如果b或c持续不断在被改变,如何触发a值也跟着变化呢?
也就是说,上述代码只是一种表达式,并没有指定a值的变化依赖b和c。

使用Reactive.js可以达到指定这种依赖关系,代码如下:


//A = B + C
var reactiveA = $R(function (b, c) { return b + c });
var reactiveB = $R.state(2);
var reactiveC = $R.state(1);
reactiveA.bindTo(reactiveB, reactiveC);

reactiveA();
//-> 3
reactiveB(5);
//Set reactiveB to 5
reactiveC(10);
//Set reactiveC to 10
reactiveA();
//-> 15

我们将b和c可以看成是被观察者,而a作为观察者,如果随着时间推移,b和c的值不断变化,如何将这种变化传导到a呢?

我们将导致b和c被观察者发生变化的一系列事件组成事件流,可以用集合来表达事件流,那么FRP框架所要做的就是,遍历这个事件流集合,将导致b和c的变化的事件重新播放,获得a的一系列值结果。

事件流被称为被观察者序列(observable sequences),其实被观察者是一种Monads。

当存在重复的事件流怎么办?只能选取一个,事件流如果很多,需要进行压缩,所有这些针对事件流(被观察者序列)的额外加工处理需要专门框架实现,RxJava和RxJS分别这样的框架,RxJava是视频巨头Netflix推出的Functional reactive框架。支持java 8的lambda,支持 Groovy, Clojure, Scala 和 JRuby语言。

根据云视频巨头Netflix的Functional Reactive in the Netflix API with RxJava一文:

RxJava作为一个Functional reactive框架,可以提供了如下对被观察者的集合(事件流)处理能力:进行filtering, selecting, transforming, combining 和composing 。

在对被观察的数据类型进行遍历中,消费者(consumer也就是被观察者自身)从生产者(producer观察者)那里拉取poll数值,然后,线程会堵塞等待直到这些数值真正到达获取。反过来说,生产者(producer观察者)在数值可用时,则是将数值推送push给消费者。这样的方式更加灵活,因为数值的获取可以是同步或异步。

以RxJava/groovy代码为例子:


/**
* Asynchronously calls 'customObservableNonBlocking' and defines
* a chain of operators to apply to the callback sequence.
*/

def simpleComposition() {
// fetch an asynchronous Observable<String>
// that emits 75 Strings of 'anotherValue_#'
customObservableNonBlocking()
// skip the first 10
.skip(10)
// take the next 5
.take(5)
// transform each String with the provided function
.map({ stringValue -> return stringValue +
"_transformed"})
// subscribe to the sequence and print each transformed String
.subscribe({ println
"onNext => " + it})
}

上述代码是对事件流集合中从第10个开始5个 事件进行订阅。
// output
onNext => anotherValue_10_transformed
onNext => anotherValue_11_transformed
onNext => anotherValue_12_transformed
onNext => anotherValue_13_transformed
onNext => anotherValue_14_transformed

使用Clojure 实现如下:


(defn simpleComposition []
"Asynchronously calls 'customObservableNonBlocking' and defines a
chain of operators to apply to the callback sequence.
"
(->
; fetch an asynchronous Observable<String>
; that emits 75 Strings of 'anotherValue_#'
(customObservableNonBlocking)
; skip the first 10
(.skip 10)
; take the next 5
(.take 5)
; transform each String with the provided function
(.map #(str %
"_transformed"))
; subscribe to the sequence and print each transformed String
(.subscribe #(println
"onNext =>" %))))

; output
onNext => anotherValue_10_transformed
onNext => anotherValue_11_transformed
onNext => anotherValue_12_transformed
onNext => anotherValue_13_transformed
onNext => anotherValue_14_transformed

值得注意的是,Java JDK提供的Futures并不能很好完成FRP,因为进行嵌套组合时带来了复杂性。

FRP的Javascript框架RxJS:https://github.com/Reactive-Extensions/RxJS
还有:Reactive.js: Functional Reactive Programming in Javascript

FRP的Java框架RxJava:https://github.com/Netflix/RxJava

Functional Reactive Programming in Dart

Functional Reactive Programming in Clojurescript