征服并发 - 在安卓Android平台引入Rx:Reactive Ext

Conquering concurrency - bringing the Reactive Ext这最新PPT诠释了如何在客户端安卓上使用Reactive Extension。

在安卓平台上,通常有几个问题难以优雅实现,如ResultReceiver结果接受,Custom Listeners BroadcastReceriver广播接收 ContentObserver争夺观察 加载回调Loader callbacks和Pull-to-refresh callbacks等。

而基于JVM的函数式reactive编程如RxJava能够解决这些问题。

在传统imperative编程范式下:
如果 int x=1; int y=x+1;
那么当x=2时,y是多少呢?还是2。

而在reactive编程范式下:
如果int x=1; Func(int) y = () -->{x+1}
那么当x=2时,y将会是3, y值能够跟随x即时反应变化。

传统imperative编程 + declarative声明+lazy evaluation懒赋值=Reactive编程

Reactive编程+higher order functions高阶函数 composition组合=
Functional reactive programming(简称FRP 函数式反应编程)。

那么这些对于移动应用意味着什么呢?

首先,目前事实是,UI驱动的应用都是很自然地基于事件和响应reactive的;其次,数据都是来自后端Web。

那么我们的编程风格也应该反应这种事实:
1. 异步 声明式的API (ask to construct)
2.事件作为可观察的序列。
3.积极拥抱面对失败,不回避。

原来的安卓本身提供的异步任务AsyncTask如何?
1.单线程,使用Future+Handler,而Futuer在复杂嵌套情况下混乱。
2.非常易于泄漏Context
3.无出错专门处理
4.非组合式。

那么一些EventBus如何?
1.没有内建出错处理。
2.事件并不能组合嵌套。
3.设计为全部 共享状态。

再看看RxJava,主要接口有可观察Observable和观察者Observer。
Observable能够做事件作为可观察的序列:


Observable.create((observer) -> {
for (int i = 1; i <= 3; i++) {
observer.onNext(i);
}
observer.onCompleted();
}).subscribe(intObserver);
//发出结果值是 1, 2, 3

Observer观察者代码如下:


Observer<Integer> intObserver = new Observer<Integer> {
public void onNext(Integer value) {
System.out.println(value);
}
public void onCompleted() {
System.out.println(“Done!”);
}
public void onError(Throwable t) { … }
}

组合Composition实现如下:


// Observable from previous example
observable.map((i) -> { return i * 2; }).subscribe(...)

//标准输出:
2
4
6
Done!

Schedulers用来实现参数化的并发:


// observable from previous example
observable
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(intObserver);

具体在安卓中使用:
将服务Service暴露为Observable<T>
实现一些Reactive组件,如adapters

比如从Service中获得结果放入Observer中:


//e.g. in onCreate
Observable<Track> observable =
AndroidObservables.fromFragment(
this, service.loadTracks())
.subscribe(this)

其中service.loadTracks代码:


public Observable<Track> loadTracks() {
APIRequest<Track> request = /* build request */
return mRxHttpClient.fetchModels(request);

而其中fechModel代码如下:


public Observable<T> fetchModels(APIRequest request) {
return fetchResponse(request).mapMany((response) -> {
return mapResponseToModels(request, response);
});
}

好处:
1.简单统一的事件模型
onNext* → onCompleted | onError
2. 重用: 异步任务组合的声明定义
3. 易于测试: 并发是可参数化的。

[该贴被banq于2013-10-26 08:51修改过]