征服并发 - 在安卓Android平台引入Rx:Reactive Ext

13-10-26 banq
Conquering concurrency - bringing the Reactive Ext这最新PPT诠释了如何在客户端安卓上使用Reactive Extension。

在安卓平台上,通常有几个问题难以优雅实现,如ResultReceiver结果接受,Custom Listeners BroadcastReceriver广播接收 ContentObserver争夺观察 加载回调Loader callbacks和Pull-to-refresh callbacks等。

而基于JVM的函数式reactive编程如RxJava能够解决这些问题。

在传统imperative编程范式下:
如果 int x=1; int y=x+1;
那么当x=2时,y是多少呢?还是2。

而在reactive编程范式下:
如果int x=1; Func(int) y = () -->{x+1}
那么当x=2时,y将会是3, y值能够跟随x即时反应变化。

传统imperative编程 + declarative声明+lazy evaluation懒赋值=Reactive编程

Reactive编程+higher order functions高阶函数 composition组合=
Functional reactive programming(简称FRP 函数式反应编程)。

那么这些对于移动应用意味着什么呢?

首先,目前事实是,UI驱动的应用都是很自然地基于事件和响应reactive的;其次,数据都是来自后端Web。

那么我们的编程风格也应该反应这种事实:
1. 异步 声明式的API (ask to construct)
2.事件作为可观察的序列。
3.积极拥抱面对失败,不回避。

原来的安卓本身提供的异步任务AsyncTask如何?
1.单线程,使用Future+Handler,而Futuer在复杂嵌套情况下混乱。
2.非常易于泄漏Context
3.无出错专门处理
4.非组合式。

那么一些EventBus如何?
1.没有内建出错处理。
2.事件并不能组合嵌套。
3.设计为全部 共享状态。

再看看RxJava,主要接口有可观察Observable和观察者Observer。
Observable能够做事件作为可观察的序列:

Observable.create((observer) -> {
         for (int i = 1; i <= 3; i++) {
            observer.onNext(i);
         }
        observer.onCompleted();
}).subscribe(intObserver);
//发出结果值是 1, 2, 3
<p class="indent">


Observer观察者代码如下:

Observer<Integer> intObserver = new Observer<Integer> {
    public void onNext(Integer value) {
          System.out.println(value);
    }
    public void onCompleted() {
           System.out.println(“Done!”);
    }
    public void onError(Throwable t) { … }
}
<p class="indent">


组合Composition实现如下:

// Observable from previous example
observable.map((i) -> { return i * 2; }).subscribe(...)

//标准输出:
2
4
6
Done!
<p class="indent">


Schedulers用来实现参数化的并发:

// observable from previous example
observable
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(intObserver);
<p class="indent">


具体在安卓中使用:
将服务Service暴露为Observable<T>
实现一些Reactive组件,如adapters

比如从Service中获得结果放入Observer中:

//e.g. in onCreate
Observable<Track> observable =
           AndroidObservables.fromFragment(
                   this, service.loadTracks())
                       .subscribe(this)
<p class="indent">


其中service.loadTracks代码:

public Observable<Track> loadTracks() {
APIRequest<Track> request = /* build request */
return mRxHttpClient.fetchModels(request);
<p class="indent">


而其中fechModel代码如下:

public Observable<T> fetchModels(APIRequest request) {
    return fetchResponse(request).mapMany((response) -> {
          return mapResponseToModels(request, response);
     });
}
<p class="indent">


好处:
1.简单统一的事件模型
onNext* → onCompleted | onError
2. 重用: 异步任务组合的声明定义
3. 易于测试: 并发是可参数化的。

[该贴被banq于2013-10-26 08:51修改过]