Actors模型专题
RXJAVA的Observable和AKKA Actor综合使用
本文介绍哦如何在Akka的Actor消费者中使用RxJava的Reactive编程。
首先看一下订阅者的代码:
public class UnSubscribe implements Serializable {}
public class Subscribe implements Serializable {
private final Action1 subscription;
}
Observable actor
第二步是可观察的observable actor. 这是一个抽象类,所有可观察的Actor类都要继承它,它可以改变akka缺省场景以便接受到消息,当actor接受到一个Subscribe。
public abstract class ObservableActor extends UntypedActor {
@Override
public void onReceive(final Object o) throws Exception {
if(o instanceof Subscribe){
System.out.println("Subscribed!");
//现在改变'onReceive'缺省行为为一个匿名类
//所有新的请求将调用processMessage,然后返回给observable
// processMessage and returned to the observable as a transformation
getContext().become(new Procedure<Object>() {
@Override
public void apply(Object message) throws Exception {
if(message instanceof UnSubscribe){
getContext().unbecome();
System.out.println("Unsubscribed");
}
else{
Subscribe subscriber = (Subscribe)(o);
subscriber.getSubscription().call(processMessage(message));
}
}
});
}
else{
System.out.println("Default behavior " + o);
}
}
protected abstract Object processMessage(Object message);
}
下面是该抽象类的一个实现:
public class AkkEcho extends ObservableActor {
@Override
protected Object processMessage(Object message) {
return message;
}
}
这儿是该抽象类的另外一个实现,方法内容有所不同:
public class AkkaMapEcho extends ObservableActor {
@Override
protected Object processMessage(Object message) {
String m = (String)(message);
return m + " mapped!";
}
}
创建observable wrapper
下面是一个observable的包装器wrapper,它创建一个发布的主题subject,用来处理进来和出去的消息, 以及保证只有一个observable绑定actor. 该包装器返回的是一个安全的消费流,多个消费者可以读取它:
public class ObservableUtil {
public static <T> Observable<T> fromActor(final ActorRef actor){
final PublishSubject<T> subj = PublishSubject.create();
Observable<T> observable = Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
/**
* 创建和初始化subscribe的方法,修改
* 缺省行为为代理请求转发,转发到订阅者的 'onNext'
* 当有人提交到actor, 我们拦截这个actors 的响应RESPONSE
* 然后类似管道一样导入到订阅者的工作队列.
*/
Subscribe msg = new Subscribe(new Action1<T>() {
@Override
public void call(T o) {
subscriber.onNext(o);
}
});
actor.tell(msg, ActorRef.noSender());
}
});
/**
* 创建一个有关这个Actor的订阅者,重新转发结果到
* 主题 subject (让其他订阅者订阅这个主题)
* 让akka observable不用担心管理谁是下一个消费者。
*
*/
observable.subscribe(new Action1<T>() {
@Override
public void call(T o) {
subj.onNext(o);
}
});
/**
* Return the subject's observable stream for others to subscribe on
*/
return subj.asObservable();
}
}
使用它
想象一个分布式系统中有可以接受消息的 actors,但是你要在它们上面花力气进行输出转换,或者通过observable API监听它们更方便。
这样你就有统一的基于事件行为的时间线,再也不用关系来自akka actor中的任何事件了,不管它们是如何发生,因为你已经订阅了它们。
下面是测试,有两个observables监听和截获事件:
/**
* Wrap an akka actor's behavior into an observable stream.
*
* Now your producer api is the actor, but your consumers can
* manipulate the underlying event stream to create behaviors
* @throws InterruptedException
*/
@Test
public void AkkaObservable() throws InterruptedException {
final Object mutex = new Object();
final ActorRef actor = createActorOfType(AkkEcho.class);
final List<String> results = new ArrayList<>();
final List<String> distinctResults = new ArrayList<>();
final Observable<String> observable = ObservableUtil.fromActor(actor);
observable.subscribe(new Action1<String>() {
@Override
public void call(String o) {
System.out.println(o);
if(o.equals("done")){
synchronized (mutex){ mutex.notify(); }
}
else{
results.add(o);
}
}
});
observable.distinct().subscribe(new Action1<String>() {
@Override
public void call(String o) {
distinctResults.add(o);
}
});
actor.tell("foo", ActorRef.noSender());
actor.tell("foo", ActorRef.noSender());
actor.tell("foo", ActorRef.noSender());
actor.tell("bar", ActorRef.noSender());
actor.tell("done", ActorRef.noSender());
synchronized (mutex){ mutex.wait(); }
Assert.assertEquals(results, Arrays.asList("foo", "foo", "foo", "bar"));
Assert.assertEquals(distinctResults, Arrays.asList("foo", "bar", "done"));
}
private ActorRef createActorOfType(Class<? extends Actor> clazz) {
ActorSystem system = ActorSystem.create("client");
return system.actorOf(Props.create(clazz), "rcv");
}