Rx (Reactive Extensions)介绍

                   
banq 13-10-24

Reactive Extensions (Rx) 原来是由微软提出的一个综合了异步和基于事件驱动编程的库包,使用可观察序列和LINQ-style查询操作。

使用Rx, 开发者可以用Observables来表达异步数据流,使用LinQ操作查询异步数据流,使用Schedulers参数化异步数据流中并发。
Rx = Observables + LINQ + Schedulers.

Rx最显著的特性是使用可观察集合(Observable Collection)来达到集成异步(composing asynchronous)和基于事件(event-based)的编程的效果。

使用Rx, 对于无论来自哪里的多少异步数据流,比如来自股票,Twitter的帖子,或计算机事件,或Web服务的请求等等,只要使用接口 IObserver<T>来订阅这些事件流,然后使用接口 IObservable<T> 在事件发生时提醒通知这些订阅者。

因为可观察的序列是数据流,你能使用标准的LINQ来查询它们,这是通过Observable的扩展方法. 这样你能过滤 聚合 组合分析这些事件。

Rx实现由如下项目:

Rx.NET:微软自己的Rx实现。
RxJS: javascript扩展
RxCpp: C 和 C++.扩展
Rx.rb: ruby
RxPy: Python 3 (Rx.Py)

RxJava:是Netflix推出的一个基于JVM的Microsoft Reactive Extensions 扩展,提供Java, Scala, Clojure, 和 Groovy语言实现。

Rx (Reactive Extensions) - Home

[该贴被banq于2013-10-24 11:33修改过]

3
banq
2013-10-24 11:38

下面以一段根据用户输入实现即时搜索代码为例子

public class MemberSearchViewModel : INotifyPropertyChanged
{
//Fields removed...
public MemberSearchViewModel(IMemberSearchModel memberSearchModel,ISchedulerProvider schedulerProvider)
{
_ memberSearchModel = memberSearchModel;
//Run search when SearchText property changes
this.PropertyChanges(vm => vm.SearchText).Subscribe(Search);
}

//Assume INotifyPropertyChanged implementations of properties...
public string SearchText { get; set; }
public bool IsSearching { get; set; }
public string Error { get; set; }
public ObservableCollection<string> Results { get; }

//Search on background thread and return result on dispatcher.
private void Search(string searchText)
{
using (_currentSearch) { }
IsSearching = true;
Results.Clear();
Error = null;
_currentSearch = _memberSearchModel.SearchMembers(searchText)
.Timeout(TimeSpan.FromSeconds(2))
.SubscribeOn(_schedulerProvider.TaskPool)
.ObserveOn(_schedulerProvider.Dispatcher)
.Subscribe(
Results.Add,
ex =>
{
IsSearching = false;
Error = ex.Message;
},
() => { IsSearching = false; });
}
...
}

优点:
1.维持了一个响应式UI(responsive UI)
2.支持timeouts
3.当搜索结束能够知晓
4.允许结果能够一次性返回。
5.能够处理错误
6.可单元测试,支持并发
7.如果用户改变搜索,能够立即取消当前搜索,对新文本输入执行新的搜索。


关于同样这个案例,可参考Javascript的RxJs实现:
http://www.jdon.com/45829

点评:
Rx是一种事件驱动+函数的编程风格,内部机制与Java的队列原理或DIsruptor非常相似,关键是无锁无堵塞,主要区别是对队列中事件的读取方式上,Rx采取LinQ风格。

Rx是处理事件流,也就是事件的集合,不是单个事件,个人认为也非常适合在app客户端或浏览器中实现与后端之间的实现异步交互。前后端都引入Rx,系统的吞吐量和并发性都会得到大的提升。

参考:
什么是函数响应式编程(Functional Reactive Programming:FRP)

reactive宣言

[该贴被banq于2013-10-24 14:01修改过]

bingyang
2013-10-25 11:18

关于EDA这种架构模式具体实现问题讨论
[该贴被bingyang于2013-10-25 14:04修改过]
[该贴被bingyang于2013-10-25 14:04修改过]

banq
2013-10-25 12:14

根据Netflix的Jafar Husain, Netflix: End to end Reactive Programming 端到端的reactive编程中介绍:

微软的Erik Meijer研究:数据库的查询和鼠标点击事件有什么区别?

答案是:其实他们没有区别,都是集合Collection。

如果你是一个个体和集合敏感的人会同意这个观点,数据库查询是一个集合,我们需要结果时要遍历这个集合取出显示,这是从后端向前端读取的方向;而从前端向后端发出的是鼠标事件,操作者操作界面发出的事件,这也是一个集合,在后端我们要遍历这个事件集合,逐个进行写操作等处理。

javascript的ES6引入了闭包:
var add = function(x,y) { return x + y; }
变成了
var add = (x,y) => x + y

下面以具体代码看看,获取查询集合和获取鼠标事件集合在代码上区别?

查询一个好看的影片集合,然后显示的代码如下:


var getTopRatedFilms = user =>
user.videoLists.
map(videoList =>
videoList.videos.
filter(video => video.rating === 5.0)).
flatten();

getTopRatedFilms(user).
forEach(film => console.log(film));


获得鼠标点击事件的代码如下:


var getElementDrags = elmt =>
elmt.mouseDowns.
map(mouseDown =>
document.mouseMoves.
filter takeUntil(document.mouseUps)).
flatten();

getElementDrags(image).
forEach(pos => image.position = pos);


我们发现这两个代码是何其相似?

既然相似,设计模式的习惯告诉我们,如果发生两个以上的重复工作,那么就要联想是否使用模型来提取?

这两行代码实际分别代表Gof模式中Iterator模式和Observer观察者模式。

Iterator<T>有三个方法:
next()
boolean hasNext()
throw new Throwable()

而Observer<T>也有三个方法:
onNext(T)
onCompleted()
onError(Throwable)

两者是重复的,因此推论可以使用一个新的更抽象的模式来表达它们。

@bingyang 你的问题最好重开一贴,本帖主要讨论Rx,Rx是实现EDA途径之一。

bingyang
2013-10-25 14:02

2013-10-25 12:14 "@banq"的内容
@bingyang 你的问题最好重开一贴,本帖主要讨论Rx,Rx是实现EDA途径之一 ...



好的 ,已发新帖 http://www.jdon.com/45835/nocache#23143698

2Go 1 2 下一页