什么是函数响应式编程(Functional Reactive Programming:FRP)

13-07-10 banq
                   

函数响应式编程(Functional Reactive Programming:FRP)是一种和事件流有关的编程方式,其角度类似EventSoucing,关注导致状态值改变的行为事件,一系列事件组成了事件流。

FRP是更加有效率地处理事件流,而无需显式去管理状态。

具体来说,FRP包括两个核心观点:

1.事件流,离散事件序列

2.属性properties, 代表模型连续的值。

一系列事件是导致属性值发生变化的原因。FRP非常类似于GOF的观察者模式。

为什么需要FRP?

FRP的需求来源于对于多个值发生改变,以javascript为例子,如下:

var a = function (b,c) { return b + c } // a = b + c

其中a实际代表b与c之和,如果b或c持续不断在被改变,如何触发a值也跟着变化呢?

也就是说,上述代码只是一种表达式,并没有指定a值的变化依赖b和c。

使用Reactive.js可以达到指定这种依赖关系,代码如下:

//A = B + C
    var reactiveA = $R(function (b, c) { return b + c });
    var reactiveB = $R.state(2);
    var reactiveC = $R.state(1);
    reactiveA.bindTo(reactiveB, reactiveC);

    reactiveA();   //-> 3
    reactiveB(5);  //Set reactiveB to 5
    reactiveC(10); //Set reactiveC to 10
    reactiveA();   //-> 15
<p>

我们将b和c可以看成是被观察者,而a作为观察者,如果随着时间推移,b和c的值不断变化,如何将这种变化传导到a呢?

我们将导致b和c被观察者发生变化的一系列事件组成事件流,可以用集合来表达事件流,那么FRP框架所要做的就是,遍历这个事件流集合,将导致b和c的变化的事件重新播放,获得a的一系列值结果。

事件流被称为被观察者序列(observable sequences),其实被观察者是一种Monads。

当存在重复的事件流怎么办?只能选取一个,事件流如果很多,需要进行压缩,所有这些针对事件流(被观察者序列)的额外加工处理需要专门框架实现,RxJava和RxJS分别这样的框架,RxJava是视频巨头Netflix推出的Functional reactive框架。支持java 8的lambda,支持 Groovy, Clojure, Scala 和 JRuby语言。

根据云视频巨头Netflix的Functional Reactive in the Netflix API with RxJava一文:

RxJava作为一个Functional reactive框架,可以提供了如下对被观察者的集合(事件流)处理能力:进行filtering, selecting, transforming, combining 和composing 。

在对被观察的数据类型进行遍历中,消费者(consumer也就是被观察者自身)从生产者(producer观察者)那里拉取poll数值,然后,线程会堵塞等待直到这些数值真正到达获取。反过来说,生产者(producer观察者)在数值可用时,则是将数值推送push给消费者。这样的方式更加灵活,因为数值的获取可以是同步或异步。

以RxJava/groovy代码为例子:

/**
 * Asynchronously calls 'customObservableNonBlocking' and defines 
 * a chain of operators to apply to the callback sequence.
 */
def simpleComposition() {
  // fetch an asynchronous Observable<String> 
  // that emits 75 Strings of 'anotherValue_#'
  customObservableNonBlocking()
    // skip the first 10
    .skip(10)
    // take the next 5
    .take(5)
    // transform each String with the provided function
    .map({ stringValue -> return stringValue + "_transformed"})
    // subscribe to the sequence and print each transformed String
    .subscribe({ println "onNext => " + it})
}

上述代码是对事件流集合中从第10个开始5个 事件进行订阅。
// output
onNext => anotherValue_10_transformed
onNext => anotherValue_11_transformed
onNext => anotherValue_12_transformed
onNext => anotherValue_13_transformed
onNext => anotherValue_14_transformed
<p>

使用Clojure 实现如下:

(defn simpleComposition []
  "Asynchronously calls 'customObservableNonBlocking' and defines a 
   chain of operators to apply to the callback sequence."
  (-> 
    ; fetch an asynchronous Observable<String> 
    ; that emits 75 Strings of 'anotherValue_#'
    (customObservableNonBlocking)
    ; skip the first 10
    (.skip 10)
    ; take the next 5
    (.take 5)
    ; transform each String with the provided function
    (.map #(str % "_transformed"))
    ; subscribe to the sequence and print each transformed String
    (.subscribe #(println "onNext =>" %))))
 
; output
onNext => anotherValue_10_transformed
onNext => anotherValue_11_transformed
onNext => anotherValue_12_transformed
onNext => anotherValue_13_transformed
onNext => anotherValue_14_transformed
<p>

值得注意的是,Java JDK提供的Futures并不能很好完成FRP,因为进行嵌套组合时带来了复杂性。

FRP的Javascript框架RxJS:https://github.com/Reactive-Extensions/RxJS

还有:Reactive.js: Functional Reactive Programming in Javascript

FRP的Java框架RxJava:https://github.com/Netflix/RxJava

Functional Reactive Programming in Dart

Functional Reactive Programming in Clojurescript

                   

4