Rx (Reactive Extensions)介绍

Reactive Extensions (Rx) 原来是由微软提出的一个综合了异步和基于事件驱动编程的库包,使用可观察序列和LINQ-style查询操作。

使用Rx, 开发者可以用Observables来表达异步数据流,使用LinQ操作查询异步数据流,使用Schedulers参数化异步数据流中并发。
Rx = Observables + LINQ + Schedulers.

Rx最显著的特性是使用可观察集合(Observable Collection)来达到集成异步(composing asynchronous)和基于事件(event-based)的编程的效果。

使用Rx, 对于无论来自哪里的多少异步数据流,比如来自股票,Twitter的帖子,或计算机事件,或Web服务的请求等等,只要使用接口 IObserver<T>来订阅这些事件流,然后使用接口 IObservable<T> 在事件发生时提醒通知这些订阅者。

因为可观察的序列是数据流,你能使用标准的LINQ来查询它们,这是通过Observable的扩展方法. 这样你能过滤 聚合 组合分析这些事件。

Rx实现由如下项目:

Rx.NET:微软自己的Rx实现。
RxJS: javascript扩展
RxCpp: C 和 C++.扩展
Rx.rb: ruby
RxPy: Python 3 (Rx.Py)

RxJava:是Netflix推出的一个基于JVM的Microsoft Reactive Extensions 扩展,提供Java, Scala, Clojure, 和 Groovy语言实现。

Rx (Reactive Extensions) - Home

[该贴被banq于2013-10-24 11:33修改过]

下面以一段根据用户输入实现即时搜索代码为例子


public class MemberSearchViewModel : INotifyPropertyChanged
{
//Fields removed...
public MemberSearchViewModel(IMemberSearchModel memberSearchModel,ISchedulerProvider schedulerProvider)
{
_ memberSearchModel = memberSearchModel;
//Run search when SearchText property changes
this.PropertyChanges(vm => vm.SearchText).Subscribe(Search);
}

//Assume INotifyPropertyChanged implementations of properties...
public string SearchText { get; set; }
public bool IsSearching { get; set; }
public string Error { get; set; }
public ObservableCollection<string> Results { get; }

//Search on background thread and return result on dispatcher.
private void Search(string searchText)
{
using (_currentSearch) { }
IsSearching = true;
Results.Clear();
Error = null;
_currentSearch = _memberSearchModel.SearchMembers(searchText)
.Timeout(TimeSpan.FromSeconds(2))
.SubscribeOn(_schedulerProvider.TaskPool)
.ObserveOn(_schedulerProvider.Dispatcher)
.Subscribe(
Results.Add,
ex =>
{
IsSearching = false;
Error = ex.Message;
},
() => { IsSearching = false; });
}
...
}

优点:
1.维持了一个响应式UI(responsive UI)
2.支持timeouts
3.当搜索结束能够知晓
4.允许结果能够一次性返回。
5.能够处理错误
6.可单元测试,支持并发
7.如果用户改变搜索,能够立即取消当前搜索,对新文本输入执行新的搜索。


关于同样这个案例,可参考Javascript的RxJs实现:
http://www.jdon.com/45829

点评:
Rx是一种事件驱动+函数的编程风格,内部机制与Java的队列原理或DIsruptor非常相似,关键是无锁无堵塞,主要区别是对队列中事件的读取方式上,Rx采取LinQ风格。

Rx是处理事件流,也就是事件的集合,不是单个事件,个人认为也非常适合在app客户端或浏览器中实现与后端之间的实现异步交互。前后端都引入Rx,系统的吞吐量和并发性都会得到大的提升。

参考:
什么是函数响应式编程(Functional Reactive Programming:FRP)

reactive宣言

[该贴被banq于2013-10-24 14:01修改过]

根据Netflix的Jafar Husain, Netflix: End to end Reactive Programming 端到端的reactive编程中介绍:

微软的Erik Meijer研究:数据库的查询和鼠标点击事件有什么区别?

答案是:其实他们没有区别,都是集合Collection。

如果你是一个个体和集合敏感的人会同意这个观点,数据库查询是一个集合,我们需要结果时要遍历这个集合取出显示,这是从后端向前端读取的方向;而从前端向后端发出的是鼠标事件,操作者操作界面发出的事件,这也是一个集合,在后端我们要遍历这个事件集合,逐个进行写操作等处理。

javascript的ES6引入了闭包:
var add = function(x,y) { return x + y; }
变成了
var add = (x,y) => x + y

下面以具体代码看看,获取查询集合和获取鼠标事件集合在代码上区别?

查询一个好看的影片集合,然后显示的代码如下:


var getTopRatedFilms = user =>
user.videoLists.
map(videoList =>
videoList.videos.
filter(video => video.rating === 5.0)).
flatten();

getTopRatedFilms(user).
forEach(film => console.log(film));

获得鼠标点击事件的代码如下:


var getElementDrags = elmt =>
elmt.mouseDowns.
map(mouseDown =>
document.mouseMoves.
filter takeUntil(document.mouseUps)).
flatten();

getElementDrags(image).
forEach(pos => image.position = pos);

我们发现这两个代码是何其相似?

既然相似,设计模式的习惯告诉我们,如果发生两个以上的重复工作,那么就要联想是否使用模型来提取?

这两行代码实际分别代表Gof模式中Iterator模式和Observer观察者模式。

Iterator<T>有三个方法:
next()
boolean hasNext()
throw new Throwable()

而Observer<T>也有三个方法:
onNext(T)
onCompleted()
onError(Throwable)

两者是重复的,因此推论可以使用一个新的更抽象的模式来表达它们。

@bingyang 你的问题最好重开一贴,本帖主要讨论Rx,Rx是实现EDA途径之一。

2013-10-25 12:14 "@banq"的内容
@bingyang 你的问题最好重开一贴,本帖主要讨论Rx,Rx是实现EDA途径之一 ...


好的 ,已发新帖 http://www.jdon.com/45835/nocache#23143698

Conquering concurrency - bringing the Reactive Ext这最新PPT诠释了如何在客户端安卓上使用Reactive Extension。

在安卓平台上,通常有几个问题难以优雅实现,如ResultReceiver结果接受,Custom Listeners BroadcastReceriver广播接收 ContentObserver争夺观察 加载回调Loader callbacks和Pull-to-refresh callbacks等。

而基于JVM的函数式reactive编程如RxJava能够解决这些问题。

在传统imperative编程范式下:
如果 int x=1; int y=x+1;
那么当x=2时,y是多少呢?还是2。

而在reactive编程范式下:
如果int x=1; Func(int) y = () -->{x+1}
那么当x=2时,y将会是3, y值能够跟随x即时反应变化。

传统imperative编程 + declarative声明+lazy evaluation懒赋值=Reactive编程

Reactive编程+higher order functions高阶函数 composition组合=
Functional reactive programming(简称FRP 函数式反应编程)。

那么这些对于移动应用意味着什么呢?

首先,目前事实是,UI驱动的应用都是很自然地基于事件和响应reactive的;其次,数据都是来自后端Web。

那么我们的编程风格也应该反应这种事实:
1. 异步 声明式的API (ask to construct)
2.事件作为可观察的序列。
3.积极拥抱面对失败,不回避。

原来的安卓本身提供的异步任务AsyncTask如何?
1.单线程,使用Future+Handler,而Futuer在复杂嵌套情况下混乱。
2.非常易于泄漏Context
3.无出错专门处理
4.非组合式。

那么一些EventBus如何?
1.没有内建出错处理。
2.事件并不能组合嵌套。
3.设计为全部 共享状态。

再看看RxJava,主要接口有可观察Observable和观察者Observer。
Observable能够做事件作为可观察的序列:


Observable.create((observer) -> {
for (int i = 1; i <= 3; i++) {
observer.onNext(i);
}
observer.onCompleted();
}).subscribe(intObserver);
//发出结果值是 1, 2, 3

Observer观察者代码如下:


Observer<Integer> intObserver = new Observer<Integer> {
public void onNext(Integer value) {
System.out.println(value);
}
public void onCompleted() {
System.out.println(“Done!”);
}
public void onError(Throwable t) { … }
}

组合Composition实现如下:


// Observable from previous example
observable.map((i) -> { return i * 2; }).subscribe(...)

//标准输出:
2
4
6
Done!

Schedulers用来实现参数化的并发:


// observable from previous example
observable
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(intObserver);

具体在安卓中使用:
将服务Service暴露为Observable<T>
实现一些Reactive组件,如adapters

比如从Service中获得结果放入Observer中:


//e.g. in onCreate
Observable<Track> observable =
AndroidObservables.fromFragment(
this, service.loadTracks())
.subscribe(this)

其中service.loadTracks代码:


public Observable<Track> loadTracks() {
APIRequest<Track> request = /* build request */
return mRxHttpClient.fetchModels(request);

而其中fechModel代码如下:


public Observable<T> fetchModels(APIRequest request) {
return fetchResponse(request).mapMany((response) -> {
return mapResponseToModels(request, response);
});
}

好处:
1.简单统一的事件模型
onNext* → onCompleted | onError
2. 重用: 异步任务组合的声明定义
3. 易于测试: 并发是可参数化的。


[该贴被banq于2013-10-26 08:48修改过]

Reactor - A Foundation for Reactive FastData Appli该文简单介绍了Spring reactor 1.0的基本特性。

目前reactor是作为Spring.io核心包下面项目。

Reactor 是一个基础性库包
–定位在用户级和低级之间的灰色区域的抽象。
– 能够在Reactor上建立组件和应用核心
– 驱动器 服务器和数据整合库,领域整合库,事件驱动架构

Reactor的应用是reactive的。
– 属于Reactive Extensions in .NET
– 类似Netflix RxJava
– 观察者模式Observer pattern

Reactor应用基于一个Selector发送事件。
– 象一个消息系统中routing topic, 但是它是一个对象
– 支持Regex, URI template, Class.isAssingableFrom, 定制逻辑

Reactor Core内部封装了LMAX Disruptor的RingBuffer,再通过Reactor-Spring等支持支持各种Spring应用,如下图:


Reactor演示代码


Environment env = new Environment();
Reactor reactor = Reactors.reactor()
.env(env)
.dispatcher(RING_BUFFER)
.get();

reactor.on($(“topic”), (Event<String> ev) → {
System.out.println(“Hello “ + ev.getData());
});

reactor.notify(“topic”, Event.wrap(“John Doe”));

RING_BUFFER是Disruptor的RingBuffer操作,熟悉Disruptor的应该知道。

reactor.notify发送一个事件,而reactor.on能够接受到这个事件即时响应。

Reactor 的分发器 Dispatchers 类似Akka的分发器
● 分发器管理任务执行,有下面几种:
– ThreadPoolExecutorDispatcher
● 标准的 ThreadPoolExecutor

– BlockingQueueDispatcher
● 能够进行事件轮询

– RingBufferDispatcher
● LMAX Disruptor RingBuffer

– SynchronousDispatcher


Reactor的 Selectors
● Selectors 是一个等式的左边。
– 一个Selector能够被任何对象使用$(obj)创建
(或者: Selectors.object(obj))
– 一个Selector能够从匹配的key中释放数据
– Predicate<T> Selectors 能够创建匹配特定领域准则
(domain-specific criteria)

比如RegexSelector:
reactor.on(R(“some.(.+)”), (Event<String> ev) → {
// s will be 'topic'
String s = ev.getHeaders().get(“group1”);
});

reactor.notify(“some.topic”, Event.wrap(“John Doe”));

其中R(“some.(.*)”)匹配事件发送者“some.topic”。

UriTemplateSelector能够从URI匹配字符串:
reactor.on(U(“/some/{topic}”), (Event<String> ev) → {
// s will be 'topic'
String s = ev.getHeaders().get(“topic”);
});
reactor.notify(“/some/topic”, Event.wrap(“John Doe”));

Reactor 的Stream
● Streams允许基于数据的函数组合composition
– Callback++
– 类似Netflix RxJava Observable, JDK 8 Stream


Stream<String> str;
str.map(String::toUpperCase)
.filter(new Predicate<String>() {
public boolean test(String s) { … }
})
.consume(s → log.info(“consumed string {}”, s));

Reactor 的 Promise
允许在Stream之间分享函数


Promise<String> p;
String s = p
.onSuccess(s → log.info(“consumed string {}”, s))
.onFailure(t → log.error(t.getMessage(), t))
.onComplete(t → log.info(“complete”))
.await(5, SECONDS);

p.map(String::toUpperCase).consume(s → log.info(“UC: {}”, s));

Reactor 的 Processor
干脆直接将Disruptor API转为Reactor API
对于UberFastData有超级快性能


Processor<Buffer> proc;
Operation<Buffer> op = proc.prepare();
op.get().append(data).flip();
op.commit();
proc.batch(512, buff → buff.append(data).flip());

与Spring整合:
首先使用@EnableReactor 激活reactor


@Configuration
@EnableReactor
public class ReactorConfiguration {

@Bean
public Reactor input(Environment env) {
return Reactors.reactor().env(env)
.dispatcher(RING_BUFFER).get();
}

@Bean
public Reactor output(Environment env) {
return Reactors.reactor().env(env)
.dispatcher(RING_BUFFER).get();
}

然后在监听者或观察者写入:


@Component
public class SimpleHandler {
@Autowired
private Reactor reactor;

@Selector(“test.topic”)
public void onTestTopic(String s) {
// Handle data
}
}

reactor的groovy整合:


@CompileStatic
def welcome(){
reactor.on('greetings') { String s ->
reply “hello $s”
reply “how are you?”
}
reactor.notify 'greetings', 'Jon'
reactor.send('greetings', 'Stephane'){
println it
cancel()
}
}



[该贴被banq于2013-10-27 10:21修改过]

这好比资源的生产和消费:
Rx之前:在iterator模式下,生产者先加工完毕产品,然后由消费者通过next和hasnext来消费产品;或者生产者不断的生产产品,而消费者用pull的手段获取产品,也不确定产品何时存在或何时生产完毕,只能耐心等待或不断的去问询。
有了Rx,由生产者以push的手段通知消费者(onnext),消费者不比问询也不必等待,爱干啥去干啥,反正产品生产好了,由生产者来提供给消费者;而且服务更周到的是,生产完成了,生产者还要告诉消费者(oncompleted)。

2013-10-29 14:56 "@oceannut"的内容
这好比资源的生产和消费 ...

解析得很有道理,与DI依赖注入有某种程度相似。

IOC容器将资源类一次性注入。这个注入其实也是一种推,现在通过Rx观察者,这种推变成一种推的流,可以持续不断地推,类似子弹不断自动上膛,不得不发一样。

当我们编程时被这两种力量推走时,也许感觉非常简单方便,如同一坐到餐桌边,服务员就不断送毛巾 加茶水,享受被服务的感觉很好。