RxJava教程之事件系列

上篇

  通过前面章节我们基本上解释了RxJava的几种类型和订阅与取消订阅等生命周期的控制方式,现在回到我们使用RxJava本义上:事件系列sequence。事件系列是指发生的一系列事件,它是一个数值序列,类似数组队列等等数据类型,使用RxJava是对这些事件进行观察订阅,以激活相应的动作。

原来的事件序列是基于 C#的 LINQ,实现,这是受函数编程思想的启发,这里不说太多函数编程的理论知识,我们看看Rx是如何操作已经存在的序列的,首先我们看看如何创建一个可观察者Observable。

创建一个事件系列

  Observable.just是创建一个Observable实例,它会发送一个预先定义好的数值序列:

Observable< String> values = Observable.just("one ", "two ", "three ");
Subscription subscription = values.subscribe(
    v -> System.out.println("Received:  " + v),
    e -> System.out.println("Error:  " + e),
    () -> System.out.println("Completed ")
);  

输出:

Received: one
Received: two
Received: three
Completed 

请注意,在这里我们已经没有如之前章节使用onNext(XX)来发送事件数值,而是一次性定义好一个事件数值序列,分别是one two three,通过前面章节讲解的订阅控制方式,我们这里的订阅提供三个函数方法,打印输出;如果错误输出Error;如果完成输出Completed。

  我们再看看Observable.empty,它是创建一个可观察者,只发射onCompleted ,其他什么也不发送:

Observable< String> values = Observable.empty();
Subscription subscription = values.subscribe(
    v -> System.out.println("Received:  " + v),
    e -> System.out.println("Error:  " + e),
    () -> System.out.println("Completed ")
);

上面代码只会输出:Completed

  再看看Observable.never,它是从不发射任何东西:

Observable< String> values = Observable.never();
Subscription subscription = values.subscribe(
    v -> System.out.println("Received:  " + v),
    e -> System.out.println("Error:  " + e),
    () -> System.out.println("Completed ")
);

上述代码将什么也不会输出。 但是并不意味着程序在这里堵塞等待了,而是直接中断了。

  Observable.error是发射一个错误事件后然后中断:

Observable< String> values = Observable.error(new Exception("Oops "));
Subscription subscription = values.subscribe(
    v -> System.out.println("Received:  " + v),
    e -> System.out.println("Error:  " + e),
    () -> System.out.println("Completed ")
);

输出:Error: java.lang.Exception: Oops

  Observable.defer并不定义一种新的可观察者类型创建,而是允许你声明一个可观察者当有订阅者时如何被创建。这个我们可以通过just方式来模拟解释:

Observable< Long> now = Observable.just(System.currentTimeMillis());
now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);

输出:

1431443908375
1431443908375 

这里just创建当前时间的可观察者,也就是说,这个可观察者只发送一个当前时间的数值,然后就中断退出了。 注意输出的这两个时间,完全是一样的时间,但是我们在代码中有sleep(1000)等待时间的,应该第二个时间会比第一个时间延迟1000,这是因为时间数值的获得是一次性的。如果我们想获得延迟1000的效果,就要使用defer了,subscribing.defer 返回的一个可观察者不是如同just创建那样是一次性的,而是对于每个新的订阅都会再创建返回一次当前时间:

Observable< Long> now = Observable.defer(() -> 
        Observable.just(System.currentTimeMillis()));
now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);

输出:

1431444107854
1431444108858 

  Observable.create是一个创建可观察者的强大函数,其方法代码如:

static <T> Observable<T> create(Observable.OnSubscribe<T> f)

其创建方法的输入参数 Observable.OnSubscribe<T>比看上去还很简单,它实际是一个带有输入参数Subscriber<T>的方法,这个T中我们能手工决定那些发送给订阅者的事件系列。

Observable< String> values = Observable.create(o -> {
    o.onNext("Hello ");
    o.onCompleted();
});
Subscription subscription = values.subscribe(
    v -> System.out.println("Received:  " + v),
    e -> System.out.println("Error:  " + e),
    () -> System.out.println("Completed ")
);
输出结果:
Received: Hello
Completed 

这里的方式非常类似于我们在前面几章的案例举例,在这里我们将事件序列创建和订阅有机结合起来,被观察者和观察者完整联系起来了。

当有人订阅这个可观察者(这里是values对象),相应的订阅者实例已经被传送到你的函数,当这段代码执行时,values事件系列将被发送到订阅者,请注意,你必须自己最后调用onCompleted表示事件系列发送结束了。

这段代码非常类似于我们创建一个Subject然后将values推送给它,但是和Subject还是有些重要区别:首先,所有的事件源是干净地被封装,并且和相关代码分离,其次, Subject有一个潜在的危险:使用Subject你管理状态时,任何只要访问Subject实例的人都可以将values发送给它,从而调整改变了事件序列。

另外一个关键区别是:与使用Subject相比,上述代码是懒惰执行的,在可观察者observable被创建时不会运行,因为还没有订阅者,当只有一个观察者订阅时才执行。这意味着对每个订阅者都会产生一次每个事件数值,这类似于前面Subject章节的ReplaySubject,但是使用ReplaySubject时会有时间消耗,因为它会堵塞执行创建的线程,你需要手工创建另外一个新线程用来将数值发送给Subject。

最后,你可能发现:前面的 Observable.create使用方式其实类似于Observable.just("hello").

 

函数化展现unfold

  在函数编程中,创建一个不受限制也就是无限长的序列是很常见的,RxJava有工厂方法这么做的:

Observable.range发射整数的集合:

Observable< Integer> values = Observable.range(10, 15);

具体测试代码:

        TestSubscriber<Integer> tester = new TestSubscriber<Integer>();

       

        Observable<Integer> values = Observable.range(10, 15);

        values.subscribe(tester);

       

        tester.assertReceivedOnNext(Arrays.asList(10,11,12,13,14,15,16,17,18,19,20,21,22,23,24));

        tester.assertTerminalEvent();

        tester.assertNoErrors();

这里rx.observers.TestSubscriber是Rx的一个用于测试的观察者,上述代码可以接受10到15长度范围内的事件序列。

Observable.interval是创建一个一定时间间隔的无限序列:

Observable< Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS);
Subscription subscription = values.subscribe(
    v -> System.out.println("Received:  " + v),
    e -> System.out.println("Error:  " + e),
    () -> System.out.println("Completed ")
);
System.in.read();

输出结果:

Received: 0
Received: 1
Received: 2
Received: 3

这个事件序列不会中断,除非取消订阅,否则随着时间一直运行下去。

我们注意到代码最后有一个堵塞读操作:System.in.read();没有这行,程序将会不会打印任何信息而中断,这是因为我们的操作是非堵塞的,我们创建一个随着时间发射数值的可观察者,然后我们注册了一个动作来响应,只有当真正有数值事件时才会执行,如果这里没有堵塞代码,主线程将会结束了,因为产生发送事件序列是在另外一个自己的线程中,不是在主线程中执行。

Observable.timer是创建一个可观察者,等待一段时间后再发射事件,然后中断:

Observable< Long> values = Observable.timer(1, TimeUnit.SECONDS);
Subscription subscription = values.subscribe(
    v -> System.out.println("Received:  " + v),
    e -> System.out.println("Error:  " + e),
    () -> System.out.println("Completed ")
);
输出:
Received: 0
Completed

 

Reactive编程专题

更多RxJava专题