函数响应式编程(Functional Reactive Programming:FRP)是一种和事件流有关的编程方式,其角度类似EventSoucing,关注导致状态值改变的行为事件,一系列事件组成了事件流。
FRP是更加有效率地处理事件流,而无需显式去管理状态。
具体来说,FRP包括两个核心观点:
1.事件流,离散事件序列
2.属性properties, 代表模型连续的值。
一系列事件是导致属性值发生变化的原因。FRP非常类似于GOF的观察者模式。
为什么需要FRP?
FRP的需求来源于对于多个值发生改变,以javascript为例子,如下:
var a = function (b,c) { return b + c } // a = b + c
其中a实际代表b与c之和,如果b或c持续不断在被改变,如何触发a值也跟着变化呢?
也就是说,上述代码只是一种表达式,并没有指定a值的变化依赖b和c。
使用Reactive.js可以达到指定这种依赖关系,代码如下:
//A = B + C var reactiveA = $R(function (b, c) { return b + c }); var reactiveB = $R.state(2); var reactiveC = $R.state(1); reactiveA.bindTo(reactiveB, reactiveC);
reactiveA(); //-> 3 reactiveB(5); //Set reactiveB to 5 reactiveC(10); //Set reactiveC to 10 reactiveA(); //-> 15
|
我们将b和c可以看成是被观察者,而a作为观察者,如果随着时间推移,b和c的值不断变化,如何将这种变化传导到a呢?
我们将导致b和c被观察者发生变化的一系列事件组成事件流,可以用集合来表达事件流,那么FRP框架所要做的就是,遍历这个事件流集合,将导致b和c的变化的事件重新播放,获得a的一系列值结果。
事件流被称为被观察者序列(observable sequences),其实被观察者是一种Monads。
当存在重复的事件流怎么办?只能选取一个,事件流如果很多,需要进行压缩,所有这些针对事件流(被观察者序列)的额外加工处理需要专门框架实现,RxJava和RxJS分别这样的框架,RxJava是视频巨头Netflix推出的Functional reactive框架。支持java 8的lambda,支持 Groovy, Clojure, Scala 和 JRuby语言。
根据云视频巨头Netflix的Functional Reactive in the Netflix API with RxJava一文:
RxJava作为一个Functional reactive框架,可以提供了如下对被观察者的集合(事件流)处理能力:进行filtering, selecting, transforming, combining 和composing 。
在对被观察的数据类型进行遍历中,消费者(consumer也就是被观察者自身)从生产者(producer观察者)那里拉取poll数值,然后,线程会堵塞等待直到这些数值真正到达获取。反过来说,生产者(producer观察者)在数值可用时,则是将数值推送push给消费者。这样的方式更加灵活,因为数值的获取可以是同步或异步。
以RxJava/groovy代码为例子:
/** * Asynchronously calls 'customObservableNonBlocking' and defines * a chain of operators to apply to the callback sequence. */ def simpleComposition() { // fetch an asynchronous Observable<String> // that emits 75 Strings of 'anotherValue_#' customObservableNonBlocking() // skip the first 10 .skip(10) // take the next 5 .take(5) // transform each String with the provided function .map({ stringValue -> return stringValue + "_transformed"}) // subscribe to the sequence and print each transformed String .subscribe({ println "onNext => " + it}) }
上述代码是对事件流集合中从第10个开始5个 事件进行订阅。 // output onNext => anotherValue_10_transformed onNext => anotherValue_11_transformed onNext => anotherValue_12_transformed onNext => anotherValue_13_transformed onNext => anotherValue_14_transformed
|
使用Clojure 实现如下:
(defn simpleComposition [] "Asynchronously calls 'customObservableNonBlocking' and defines a chain of operators to apply to the callback sequence." (-> ; fetch an asynchronous Observable<String> ; that emits 75 Strings of 'anotherValue_#' (customObservableNonBlocking) ; skip the first 10 (.skip 10) ; take the next 5 (.take 5) ; transform each String with the provided function (.map #(str % "_transformed")) ; subscribe to the sequence and print each transformed String (.subscribe #(println "onNext =>" %)))) ; output onNext => anotherValue_10_transformed onNext => anotherValue_11_transformed onNext => anotherValue_12_transformed onNext => anotherValue_13_transformed onNext => anotherValue_14_transformed
|
值得注意的是,Java JDK提供的Futures并不能很好完成FRP,因为进行嵌套组合时带来了复杂性。
FRP的Javascript框架RxJS:https://github.com/Reactive-Extensions/RxJS
还有:Reactive.js: Functional Reactive Programming in Javascript
FRP的Java框架RxJava:https://github.com/Netflix/RxJava
Functional Reactive Programming in Dart
Functional Reactive Programming in Clojurescript