RxJava教程

  RxJava是一个通过可观察的序列方式实现异步和基于事件的响应式Reactive编程,对一系列事件能够进行组合和消费使用,其Github地址见这里

  理解RxJava主要首先要了解什么GOF的观察者模式,它是拓展了传统的观察者模式,支持一系列数据或事件,针对这些事件提供响应的激活与操作方法。

  观察者模式有两个主要的角色:可观察者Observable和观察者Observer。可观察者Observable也就是被观察者,它是用来被观察者观察的,观察者一般首先会订阅一些被观察者,也就是说,这些被观察者一旦发生变化,应该及时通知观察者,正如我们订阅杂志一样,一旦有新的杂志会自动发送给我们。

 

被观察者

  RXJava的Observable 是第一个核心类,代表被观察者,其中有一个重要方法订阅:

public final Subscription subscribe(Subscriber<? super T> subscriber)

这是一个用来接收被观察者发出数据的方法,也就是说,被观察者一旦发生变化,其事件值是通过这个方法发送给所有的订阅者(观察者),Subscriber 是接口Observer 的一个实现。

一个可观察者发送三种事件:

  • 数据值
  • 完成Completion, 用来表示再也没有更多数据推送了。
  • 错误Errors, 如果在推送期间发生错误,这一系列事件意味着中断了。

 

观察者

  Observer是RxJava的观察者,Subscriber 是其子类,另外增加了一些额外功能,应该算是观察者的基本实现,所以,我们必须首先了解这个接口:

interface Observer<T> {
    void onCompleted();
    void onError(java.lang .Throwable e);
    void onNext(T t);
}

这三个方法会在可观察者observable每次推送数据值时都会激活执行,onNext方法可任意多次数调用,onCompleted 或 onError是可选的,onCompleted 或 onError只能调用一次。

你可以手工实现Observer接口,或继承Observable,但是实战中通常不必要,因为RxJava已经提供了你需要的很多构建代码。为了订阅一个被观察者Observable,你实际上不必提供Observer实例,这是区别于通常观察者模式。

 

Subject

  Subject主题是可观察者Observable的一个拓展,同时实现了Observer接口,也就是说,通过引入Subject,我们将可观察者和观察者联系起来,这样主要是为了简化,Subject能像观察者那样接受发送给它们的事件,也能像可观察者一样将事件发送给自己的订阅者。Subject能成为RxJava的理想入口,当你有来自Rx外部的事件数据值时,你能将它们推送到一个Subject,把它们转为一个可观察者(被观察者),由此可以作为Rx整个管道连的切入点。这个概念很有函数编程的味道。

Subject有两个参数类型:输入类型和输出类型,这是来自函数编程中纯函数的概念,Subject有许多不同实现,我们看看主要的实现子类:

  1. PublishSubject 是最直接主要的Subject实现,当一个事件值被发送给PublishSubject时,它会将这个事件值发送给订阅它的每个订阅者:

public static void main(String[] args) {
    PublishSubject< Integer> subject = PublishSubject.create();
    subject.onNext(1);
    subject.subscribe(System.out::println);
    subject.onNext(2);
    subject.onNext(3);
    subject.onNext(4);
}

这段代码输出一个序列或系列值,包含三个数字: 2 3 4

但是,你会发现数值1并没有被输出,因为它被推送时,subject还没有被订阅为系统输出System.out::println,只有订阅以后我们才能接受到发送给它的数值。这里的subscribe是onNext的一个Function,这个函数有一个整数型参数,但是什么也不返回,一个什么也不返回的Function也称为action。

  2. ReplaySubject 是用来缓存所有推送给它的数据值,当有一个新的订阅者,那么就会为这个新的订阅者从头开始播放原来的一系列事件。当再有新的事件来时,所有的订阅者也会接受到的。

ReplaySubject< Integer> s = ReplaySubject.create();  
s.subscribe(v -> System.out.println("Early: " + v));
s.onNext(0);
s.onNext(1);
s.subscribe(v -> System.out.println("Late:  " + v)); 
s.onNext(2);

上述代码运行输出:

Early:0
Early:1
Late: 0
Late: 1
Early:2
Late: 2 

所有的事件值都被订阅者接受到了,无论它们是否在订阅之前或之后被推送的。请注意,缓存所有数据会受内存大小限制,我们需要限制缓存的大小,ReplaySubject.createWithSize是用来限制缓存大小,而ReplaySubject.createWithTime限制一个对象会被缓存多长时间。

ReplaySubject< Integer> s = ReplaySubject.createWithSize(2); 
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late:  " + v)); 
s.onNext(3);

上述代码输出:

Late: 1
Late: 2
Late: 3 

订阅者没有收到数值0的第一个值,因为缓存大小是2,下面是时间限制:

ReplaySubject< Integer> s = ReplaySubject.createWithTime(150, TimeUnit.MILLISECONDS,
                                                        Schedulers.immediate());
s.onNext(0);
Thread.sleep(100);
s.onNext(1);
Thread.sleep(100);
s.onNext(2);
s.subscribe(v -> System.out.println("Late:  " + v)); 
s.onNext(3);

输出:

Late: 1
Late: 2
Late: 3 

创建ReplaySubject 需要一个Scheduler,但是RxJava会为了并发帮你在Scheduler上节省时间,可见后面的并发编程章节。

  3. BehaviorSubject是只记得最后的值,类似于将缓存大小设为1的ReplaySubject,创建时会提供一个初始值,这样能保证被订阅时总是立即有一个值可用。

BehaviorSubject< Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late:  " + v)); 
s.onNext(3);

输出:

Late: 2
Late: 3 

下面是展示onCompleted使用,这里能使用onCompleted,因为使用之前已经是最后一个事件。

BehaviorSubject< Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();
s.subscribe(
    v -> System.out.println("Late:  " + v),
    e -> System.out.println("Error "),
    () -> System.out.println("Completed ")
);

下面代码是提供初始值:

BehaviorSubject< Integer> s = BehaviorSubject.create(0);
s.subscribe(v -> System.out.println(v));
s.onNext(1);

输出是两个数值: 0 和1

 

  4. AsyncSubject 也会缓存最后的值,区别是,只有等整个事件系列完成时才会发送,否则一个值都不会发送。一旦完成发送一个单个值。

AsyncSubject< Integer> s = AsyncSubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();

输出:2

如果我们这里不调用onCompleted()方法,这里就不会有任何输出。

 

隐式约定

  RxJava有一个潜在的约定,在事件中断(onError 或 onCompleted)时不会有任何事件发送,Subject实现也是遵循这样的约定,subscribe也会阻止违背约定的情况发生。

Subject< Integer,  Integer> s = ReplaySubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onCompleted();
s.onNext(1);
s.onNext(2);

输出是: 0

 

下篇生命周期管理

更多RxJava专题

Reactive编程专题

RxJS教程