Rx (Reactive Extensions)介绍

13-10-24 banq
Reactive Extensions (Rx) 原来是由微软提出的一个综合了异步和基于事件驱动编程的库包,使用可观察序列和LINQ-style查询操作。

使用Rx, 开发者可以用Observables来表达异步数据流,使用LinQ操作查询异步数据流,使用Schedulers参数化异步数据流中并发。

Rx = Observables + LINQ + Schedulers.

Rx最显著的特性是使用可观察集合(Observable Collection)来达到集成异步(composing asynchronous)和基于事件(event-based)的编程的效果。

使用Rx, 对于无论来自哪里的多少异步数据流,比如来自股票,Twitter的帖子,或计算机事件,或Web服务的请求等等,只要使用接口 IObserver<T>来订阅这些事件流,然后使用接口 IObservable<T> 在事件发生时提醒通知这些订阅者。

因为可观察的序列是数据流,你能使用标准的LINQ来查询它们,这是通过Observable的扩展方法. 这样你能过滤 聚合 组合分析这些事件。

Rx实现由如下项目:

Rx.NET:微软自己的Rx实现。

RxJS: javascript扩展

RxCpp: C 和 C++.扩展

Rx.rb: ruby

RxPy: Python 3 (Rx.Py)

RxJava:是Netflix推出的一个基于JVM的Microsoft Reactive Extensions 扩展,提供Java, Scala, Clojure, 和 Groovy语言实现。

Rx (Reactive Extensions) - Home

[该贴被banq于2013-10-24 11:33修改过]

3
banq
2013-10-24 11:38
下面以一段根据用户输入实现即时搜索代码为例子

public class MemberSearchViewModel : INotifyPropertyChanged
{
   //Fields removed...
   public MemberSearchViewModel(IMemberSearchModel memberSearchModel,ISchedulerProvider schedulerProvider)
  {
     _ memberSearchModel = memberSearchModel;
      //Run search when SearchText property changes
      this.PropertyChanges(vm => vm.SearchText).Subscribe(Search);
}

//Assume INotifyPropertyChanged implementations of properties...
public string SearchText { get; set; }
public bool IsSearching { get; set; }
public string Error { get; set; }
public ObservableCollection<string> Results { get; }

//Search on background thread and return result on dispatcher.
private void Search(string searchText)
{
   using (_currentSearch) { }
          IsSearching = true;
          Results.Clear();
          Error = null;
        _currentSearch = _memberSearchModel.SearchMembers(searchText)
                   .Timeout(TimeSpan.FromSeconds(2))
                   .SubscribeOn(_schedulerProvider.TaskPool)
                   .ObserveOn(_schedulerProvider.Dispatcher)
                   .Subscribe(
                             Results.Add,
                             ex =>
                              {
                                        IsSearching = false;
                                        Error = ex.Message;
                              },
                               () => { IsSearching = false; });
}
...
}
<p>

优点:

1.维持了一个响应式UI(responsive UI)

2.支持timeouts

3.当搜索结束能够知晓

4.允许结果能够一次性返回。

5.能够处理错误

6.可单元测试,支持并发

7.如果用户改变搜索,能够立即取消当前搜索,对新文本输入执行新的搜索。

关于同样这个案例,可参考Javascript的RxJs实现:

http://www.jdon.com/45829

点评:

Rx是一种事件驱动+函数的编程风格,内部机制与Java的队列原理或DIsruptor非常相似,关键是无锁无堵塞,主要区别是对队列中事件的读取方式上,Rx采取LinQ风格。

Rx是处理事件流,也就是事件的集合,不是单个事件,个人认为也非常适合在app客户端或浏览器中实现与后端之间的实现异步交互。前后端都引入Rx,系统的吞吐量和并发性都会得到大的提升。

参考:

什么是函数响应式编程(Functional Reactive Programming:FRP)

reactive宣言

[该贴被banq于2013-10-24 14:01修改过]

bingyang
2013-10-25 11:18
关于EDA这种架构模式具体实现问题讨论

[该贴被bingyang于2013-10-25 14:04修改过]

[该贴被bingyang于2013-10-25 14:04修改过]

banq
2013-10-25 12:14
根据Netflix的Jafar Husain, Netflix: End to end Reactive Programming 端到端的reactive编程中介绍:

微软的Erik Meijer研究:数据库的查询和鼠标点击事件有什么区别?

答案是:其实他们没有区别,都是集合Collection。

如果你是一个个体和集合敏感的人会同意这个观点,数据库查询是一个集合,我们需要结果时要遍历这个集合取出显示,这是从后端向前端读取的方向;而从前端向后端发出的是鼠标事件,操作者操作界面发出的事件,这也是一个集合,在后端我们要遍历这个事件集合,逐个进行写操作等处理。

javascript的ES6引入了闭包:

var add = function(x,y) { return x + y; }

变成了

var add = (x,y) => x + y

下面以具体代码看看,获取查询集合和获取鼠标事件集合在代码上区别?

查询一个好看的影片集合,然后显示的代码如下:

var getTopRatedFilms = user =>
         user.videoLists.
             map(videoList =>
                      videoList.videos.
                           filter(video => video.rating === 5.0)).
            flatten();

getTopRatedFilms(user).
      forEach(film => console.log(film));
<p>

获得鼠标点击事件的代码如下:

var getElementDrags = elmt =>
      elmt.mouseDowns.
           map(mouseDown =>
                   document.mouseMoves.
                         filter takeUntil(document.mouseUps)).
          flatten();

getElementDrags(image).
       forEach(pos => image.position = pos);
<p>

我们发现这两个代码是何其相似?

既然相似,设计模式的习惯告诉我们,如果发生两个以上的重复工作,那么就要联想是否使用模型来提取?

这两行代码实际分别代表Gof模式中Iterator模式和Observer观察者模式。

Iterator<T>有三个方法:

next()

boolean hasNext()

throw new Throwable()

而Observer<T>也有三个方法:

onNext(T)

onCompleted()

onError(Throwable)

两者是重复的,因此推论可以使用一个新的更抽象的模式来表达它们。

@bingyang 你的问题最好重开一贴,本帖主要讨论Rx,Rx是实现EDA途径之一。

bingyang
2013-10-25 14:02
2013-10-25 12:14 "@banq"的内容
@bingyang 你的问题最好重开一贴,本帖主要讨论Rx,Rx是实现EDA途径之一 ...

好的 ,已发新帖 http://www.jdon.com/45835/nocache#23143698

banq
2013-10-26 08:45
Conquering concurrency - bringing the Reactive Ext这最新PPT诠释了如何在客户端安卓上使用Reactive Extension。

在安卓平台上,通常有几个问题难以优雅实现,如ResultReceiver结果接受,Custom Listeners BroadcastReceriver广播接收 ContentObserver争夺观察 加载回调Loader callbacks和Pull-to-refresh callbacks等。

而基于JVM的函数式reactive编程如RxJava能够解决这些问题。

在传统imperative编程范式下:

如果 int x=1; int y=x+1;

那么当x=2时,y是多少呢?还是2。

而在reactive编程范式下:

如果int x=1; Func(int) y = () -->{x+1}

那么当x=2时,y将会是3, y值能够跟随x即时反应变化。

传统imperative编程 + declarative声明+lazy evaluation懒赋值=Reactive编程

Reactive编程+higher order functions高阶函数 composition组合=

Functional reactive programming(简称FRP 函数式反应编程)。

那么这些对于移动应用意味着什么呢?

首先,目前事实是,UI驱动的应用都是很自然地基于事件和响应reactive的;其次,数据都是来自后端Web。

那么我们的编程风格也应该反应这种事实:

1. 异步 声明式的API (ask to construct)

2.事件作为可观察的序列。

3.积极拥抱面对失败,不回避。

原来的安卓本身提供的异步任务AsyncTask如何?

1.单线程,使用Future+Handler,而Futuer在复杂嵌套情况下混乱。

2.非常易于泄漏Context

3.无出错专门处理

4.非组合式。

那么一些EventBus如何?

1.没有内建出错处理。

2.事件并不能组合嵌套。

3.设计为全部 共享状态。

再看看RxJava,主要接口有可观察Observable和观察者Observer。

Observable能够做事件作为可观察的序列:

Observable.create((observer) -> {
         for (int i = 1; i <= 3; i++) {
            observer.onNext(i);
         }
        observer.onCompleted();
}).subscribe(intObserver);
//发出结果值是 1, 2, 3
<p>

Observer观察者代码如下:

Observer<Integer> intObserver = new Observer<Integer> {
    public void onNext(Integer value) {
          System.out.println(value);
    }
    public void onCompleted() {
           System.out.println(“Done!”);
    }
    public void onError(Throwable t) { … }
}
<p>

组合Composition实现如下:

// Observable from previous example
observable.map((i) -> { return i * 2; }).subscribe(...)

//标准输出:
2
4
6
Done!
<p>

Schedulers用来实现参数化的并发:

// observable from previous example
observable
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(intObserver);
<p>

具体在安卓中使用:

将服务Service暴露为Observable<T>

实现一些Reactive组件,如adapters

比如从Service中获得结果放入Observer中:

//e.g. in onCreate
Observable<Track> observable =
           AndroidObservables.fromFragment(
                   this, service.loadTracks())
                       .subscribe(this)
<p>

其中service.loadTracks代码:

public Observable<Track> loadTracks() {
APIRequest<Track> request = /* build request */
return mRxHttpClient.fetchModels(request);
<p>

而其中fechModel代码如下:

public Observable<T> fetchModels(APIRequest request) {
    return fetchResponse(request).mapMany((response) -> {
          return mapResponseToModels(request, response);
     });
}
<p>

好处:

1.简单统一的事件模型

onNext* → onCompleted | onError

2. 重用: 异步任务组合的声明定义

3. 易于测试: 并发是可参数化的。

[该贴被banq于2013-10-26 08:48修改过]

banq
2013-10-27 10:20
Reactor - A Foundation for Reactive FastData Appli该文简单介绍了Spring reactor 1.0的基本特性。

目前reactor是作为Spring.io核心包下面项目。

Reactor 是一个基础性库包

–定位在用户级和低级之间的灰色区域的抽象。

– 能够在Reactor上建立组件和应用核心

– 驱动器 服务器和数据整合库,领域整合库,事件驱动架构

Reactor的应用是reactive的。

– 属于Reactive Extensions in .NET

– 类似Netflix RxJava

– 观察者模式Observer pattern

Reactor应用基于一个Selector发送事件。

– 象一个消息系统中routing topic, 但是它是一个对象

– 支持Regex, URI template, Class.isAssingableFrom, 定制逻辑

Reactor Core内部封装了LMAX Disruptor的RingBuffer,再通过Reactor-Spring等支持支持各种Spring应用,如下图:


Reactor演示代码

Environment env = new Environment();
Reactor reactor = Reactors.reactor()
                               .env(env)
                               .dispatcher(RING_BUFFER)
                               .get();

reactor.on($(“topic”), (Event<String> ev) → {
                             System.out.println(“Hello “ + ev.getData());
                  });

reactor.notify(“topic”, Event.wrap(“John Doe”));
<p>

RING_BUFFER是Disruptor的RingBuffer操作,熟悉Disruptor的应该知道。

reactor.notify发送一个事件,而reactor.on能够接受到这个事件即时响应。

Reactor 的分发器 Dispatchers 类似Akka的分发器

● 分发器管理任务执行,有下面几种:

– ThreadPoolExecutorDispatcher

● 标准的 ThreadPoolExecutor

– BlockingQueueDispatcher

● 能够进行事件轮询

– RingBufferDispatcher

● LMAX Disruptor RingBuffer

– SynchronousDispatcher

Reactor的 Selectors

● Selectors 是一个等式的左边。

– 一个Selector能够被任何对象使用$(obj)创建

(或者: Selectors.object(obj))

– 一个Selector能够从匹配的key中释放数据

– Predicate<T> Selectors 能够创建匹配特定领域准则

(domain-specific criteria)

比如RegexSelector:

reactor.on(R(“some.(.+)”), (Event<String> ev) → {

// s will be 'topic'

String s = ev.getHeaders().get(“group1”);

});

reactor.notify(“some.topic”, Event.wrap(“John Doe”));

其中R(“some.(.*)”)匹配事件发送者“some.topic”。

UriTemplateSelector能够从URI匹配字符串:

reactor.on(U(“/some/{topic}”), (Event<String> ev) → {

// s will be 'topic'

String s = ev.getHeaders().get(“topic”);

});

reactor.notify(“/some/topic”, Event.wrap(“John Doe”));

Reactor 的Stream

● Streams允许基于数据的函数组合composition

– Callback++

– 类似Netflix RxJava Observable, JDK 8 Stream

Stream<String> str;
str.map(String::toUpperCase)
     .filter(new Predicate<String>() {
               public boolean test(String s) { … }
     })
    .consume(s → log.info(“consumed string {}”, s));
<p>

Reactor 的 Promise

允许在Stream之间分享函数

Promise<String> p;
String s = p
        .onSuccess(s → log.info(“consumed string {}”, s))
        .onFailure(t → log.error(t.getMessage(), t))
        .onComplete(t → log.info(“complete”))
        .await(5, SECONDS);

p.map(String::toUpperCase).consume(s → log.info(“UC: {}”, s));
<p>

Reactor 的 Processor

干脆直接将Disruptor API转为Reactor API

对于UberFastData有超级快性能

Processor<Buffer> proc;
Operation<Buffer> op = proc.prepare();
op.get().append(data).flip();
op.commit();
proc.batch(512, buff → buff.append(data).flip());
<p>

与Spring整合:

首先使用@EnableReactor 激活reactor

@Configuration
@EnableReactor
public class ReactorConfiguration {

  @Bean
   public Reactor input(Environment env) {
        return Reactors.reactor().env(env)
                  .dispatcher(RING_BUFFER).get();
   }

   @Bean
   public Reactor output(Environment env) {
        return Reactors.reactor().env(env)
                  .dispatcher(RING_BUFFER).get();
}
<p>

然后在监听者或观察者写入:

@Component
public class SimpleHandler {
    @Autowired
    private Reactor reactor;

    @Selector(“test.topic”)
    public void onTestTopic(String s) {
             // Handle data
    }
}
<p>

reactor的groovy整合:

@CompileStatic
def welcome(){
    reactor.on('greetings') { String s ->
            reply “hello $s”
            reply “how are you?”
}
reactor.notify 'greetings', 'Jon'
           reactor.send('greetings', 'Stephane'){
                  println it
            cancel()
           }
}
<p>

[该贴被banq于2013-10-27 10:21修改过]

oceannut
2013-10-29 14:56
这好比资源的生产和消费:

Rx之前:在iterator模式下,生产者先加工完毕产品,然后由消费者通过next和hasnext来消费产品;或者生产者不断的生产产品,而消费者用pull的手段获取产品,也不确定产品何时存在或何时生产完毕,只能耐心等待或不断的去问询。

有了Rx,由生产者以push的手段通知消费者(onnext),消费者不比问询也不必等待,爱干啥去干啥,反正产品生产好了,由生产者来提供给消费者;而且服务更周到的是,生产完成了,生产者还要告诉消费者(oncompleted)。

banq
2013-10-29 15:10
2013-10-29 14:56 "@oceannut"的内容
这好比资源的生产和消费 ...

解析得很有道理,与DI依赖注入有某种程度相似。

IOC容器将资源类一次性注入。这个注入其实也是一种推,现在通过Rx观察者,这种推变成一种推的流,可以持续不断地推,类似子弹不断自动上膛,不得不发一样。

当我们编程时被这两种力量推走时,也许感觉非常简单方便,如同一坐到餐桌边,服务员就不断送毛巾 加茶水,享受被服务的感觉很好。

猜你喜欢