RxJava教程
RxJava是一个通过可观察的序列方式实现异步和基于事件的响应式Reactive编程,对一系列事件能够进行组合和消费使用,其Github地址见这里。
理解RxJava主要首先要了解什么GOF的观察者模式,它是拓展了传统的观察者模式,支持一系列数据或事件,针对这些事件提供响应的激活与操作方法。
观察者模式有两个主要的角色:可观察者Observable和观察者Observer。可观察者Observable也就是被观察者,它是用来被观察者观察的,观察者一般首先会订阅一些被观察者,也就是说,这些被观察者一旦发生变化,应该及时通知观察者,正如我们订阅杂志一样,一旦有新的杂志会自动发送给我们。
被观察者
RXJava的Observable 是第一个核心类,代表被观察者,其中有一个重要方法订阅:
public final Subscription subscribe(Subscriber<? super T> subscriber)
这是一个用来接收被观察者发出数据的方法,也就是说,被观察者一旦发生变化,其事件值是通过这个方法发送给所有的订阅者(观察者),Subscriber 是接口Observer 的一个实现。
一个可观察者发送三种事件:
- 数据值
- 完成Completion, 用来表示再也没有更多数据推送了。
- 错误Errors, 如果在推送期间发生错误,这一系列事件意味着中断了。
观察者
Observer是RxJava的观察者,Subscriber 是其子类,另外增加了一些额外功能,应该算是观察者的基本实现,所以,我们必须首先了解这个接口:
interface Observer<T> {
void onCompleted();
void onError(java.lang .Throwable e);
void onNext(T t);
}
这三个方法会在可观察者observable每次推送数据值时都会激活执行,onNext方法可任意多次数调用,onCompleted 或 onError是可选的,onCompleted 或 onError只能调用一次。
你可以手工实现Observer接口,或继承Observable,但是实战中通常不必要,因为RxJava已经提供了你需要的很多构建代码。为了订阅一个被观察者Observable,你实际上不必提供Observer实例,这是区别于通常观察者模式。
Subject
Subject主题是可观察者Observable的一个拓展,同时实现了Observer接口,也就是说,通过引入Subject,我们将可观察者和观察者联系起来,这样主要是为了简化,Subject能像观察者那样接受发送给它们的事件,也能像可观察者一样将事件发送给自己的订阅者。Subject能成为RxJava的理想入口,当你有来自Rx外部的事件数据值时,你能将它们推送到一个Subject,把它们转为一个可观察者(被观察者),由此可以作为Rx整个管道连的切入点。这个概念很有函数编程的味道。
Subject有两个参数类型:输入类型和输出类型,这是来自函数编程中纯函数的概念,Subject有许多不同实现,我们看看主要的实现子类:
1. PublishSubject 是最直接主要的Subject实现,当一个事件值被发送给PublishSubject时,它会将这个事件值发送给订阅它的每个订阅者:
public static void main(String[] args) {
PublishSubject< Integer> subject = PublishSubject.create();
subject.onNext(1);
subject.subscribe(System.out::println);
subject.onNext(2);
subject.onNext(3);
subject.onNext(4);
}
这段代码输出一个序列或系列值,包含三个数字: 2 3 4
但是,你会发现数值1并没有被输出,因为它被推送时,subject还没有被订阅为系统输出System.out::println,只有订阅以后我们才能接受到发送给它的数值。这里的subscribe是onNext的一个Function,这个函数有一个整数型参数,但是什么也不返回,一个什么也不返回的Function也称为action。
2. ReplaySubject 是用来缓存所有推送给它的数据值,当有一个新的订阅者,那么就会为这个新的订阅者从头开始播放原来的一系列事件。当再有新的事件来时,所有的订阅者也会接受到的。
ReplaySubject< Integer> s = ReplaySubject.create();
s.subscribe(v -> System.out.println("Early: " + v));
s.onNext(0);
s.onNext(1);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(2);
上述代码运行输出:
Early:0
Early:1
Late: 0
Late: 1
Early:2
Late: 2
所有的事件值都被订阅者接受到了,无论它们是否在订阅之前或之后被推送的。请注意,缓存所有数据会受内存大小限制,我们需要限制缓存的大小,ReplaySubject.createWithSize是用来限制缓存大小,而ReplaySubject.createWithTime限制一个对象会被缓存多长时间。
ReplaySubject< Integer> s = ReplaySubject.createWithSize(2);
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);
上述代码输出:
Late: 1
Late: 2
Late: 3
订阅者没有收到数值0的第一个值,因为缓存大小是2,下面是时间限制:
ReplaySubject< Integer> s = ReplaySubject.createWithTime(150, TimeUnit.MILLISECONDS,
Schedulers.immediate());
s.onNext(0);
Thread.sleep(100);
s.onNext(1);
Thread.sleep(100);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);
输出:
Late: 1
Late: 2
Late: 3
创建ReplaySubject 需要一个Scheduler,但是RxJava会为了并发帮你在Scheduler上节省时间,可见后面的并发编程章节。
3. BehaviorSubject是只记得最后的值,类似于将缓存大小设为1的ReplaySubject,创建时会提供一个初始值,这样能保证被订阅时总是立即有一个值可用。
BehaviorSubject< Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);
输出:
Late: 2
Late: 3
下面是展示onCompleted使用,这里能使用onCompleted,因为使用之前已经是最后一个事件。
BehaviorSubject< Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();
s.subscribe(
v -> System.out.println("Late: " + v),
e -> System.out.println("Error "),
() -> System.out.println("Completed ")
);
下面代码是提供初始值:
BehaviorSubject< Integer> s = BehaviorSubject.create(0);
s.subscribe(v -> System.out.println(v));
s.onNext(1);
输出是两个数值: 0 和1
4. AsyncSubject 也会缓存最后的值,区别是,只有等整个事件系列完成时才会发送,否则一个值都不会发送。一旦完成发送一个单个值。
AsyncSubject< Integer> s = AsyncSubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();
输出:2
如果我们这里不调用onCompleted()方法,这里就不会有任何输出。
隐式约定
RxJava有一个潜在的约定,在事件中断(onError 或 onCompleted)时不会有任何事件发送,Subject实现也是遵循这样的约定,subscribe也会阻止违背约定的情况发生。
Subject< Integer, Integer> s = ReplaySubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onCompleted();
s.onNext(1);
s.onNext(2);
输出是: 0