Actors模型专题

RXJAVA的Observable和AKKA Actor综合使用

  本文介绍哦如何在Akka的Actor消费者中使用RxJava的Reactive编程。

  首先看一下订阅者的代码:

public class UnSubscribe implements Serializable {}

public class Subscribe implements Serializable {
    private final Action1 subscription;
}

Observable actor

  第二步是可观察的observable actor. 这是一个抽象类,所有可观察的Actor类都要继承它,它可以改变akka缺省场景以便接受到消息,当actor接受到一个Subscribe。

public abstract class ObservableActor extends UntypedActor {
 
    @Override
    public void onReceive(final Object o) throws Exception {
        if(o instanceof Subscribe){
            System.out.println("Subscribed!");
 
            //现在改变'onReceive'缺省行为为一个匿名类
            //所有新的请求将调用processMessage,然后返回给observable
            // processMessage and returned to the observable as a transformation
            getContext().become(new Procedure<Object>() {
                @Override
                public void apply(Object message) throws Exception {
                    if(message instanceof UnSubscribe){
                        getContext().unbecome();
 
                        System.out.println("Unsubscribed");
                    }
                    else{
                        Subscribe subscriber = (Subscribe)(o);
 
                        subscriber.getSubscription().call(processMessage(message));
                    }
                }
            });
        }
        else{
            System.out.println("Default behavior " + o);
        }
 
    }
 
    protected abstract Object processMessage(Object message);
}

 

下面是该抽象类的一个实现:

public class AkkEcho extends ObservableActor {
    @Override
    protected Object processMessage(Object message) {
        return message;
    }
}

这儿是该抽象类的另外一个实现,方法内容有所不同:

public class AkkaMapEcho extends ObservableActor {
    @Override
    protected Object processMessage(Object message) {
        String m = (String)(message);
 
        return m + " mapped!";
    }
}

 

创建observable wrapper

  下面是一个observable的包装器wrapper,它创建一个发布的主题subject,用来处理进来和出去的消息, 以及保证只有一个observable绑定actor. 该包装器返回的是一个安全的消费流,多个消费者可以读取它:

public class ObservableUtil {
 
    public static <T> Observable<T> fromActor(final ActorRef actor){
        final PublishSubject<T> subj = PublishSubject.create();
 
        Observable<T> observable = Observable.create(new Observable.OnSubscribe<T>() {
            @Override
            public void call(final Subscriber<? super T> subscriber) {
 
                /**
                 * 创建和初始化subscribe的方法,修改
                 * 缺省行为为代理请求转发,转发到订阅者的 'onNext'  
                 * 当有人提交到actor, 我们拦截这个actors 的响应RESPONSE
                 * 然后类似管道一样导入到订阅者的工作队列.
                 */
                Subscribe msg = new Subscribe(new Action1<T>() {
                    @Override
                    public void call(T o) {
                        subscriber.onNext(o);
                    }
                });
 
                actor.tell(msg, ActorRef.noSender());
            }
        });
 
        /**
         * 创建一个有关这个Actor的订阅者,重新转发结果到
         * 主题 subject (让其他订阅者订阅这个主题)
         * 让akka observable不用担心管理谁是下一个消费者。
         *
         */
        observable.subscribe(new Action1<T>() {
            @Override
            public void call(T o) {
                subj.onNext(o);
            }
        });
 
        /**
         * Return the subject's observable stream for others to subscribe on
         */
        return subj.asObservable();
    }
}

 

使用它

  想象一个分布式系统中有可以接受消息的 actors,但是你要在它们上面花力气进行输出转换,或者通过observable API监听它们更方便。

  这样你就有统一的基于事件行为的时间线,再也不用关系来自akka actor中的任何事件了,不管它们是如何发生,因为你已经订阅了它们。

  下面是测试,有两个observables监听和截获事件:

/**
 * Wrap an akka actor's behavior into an observable stream.
 *
 * Now your producer api is the actor, but your consumers can
 * manipulate the underlying event stream to create behaviors
 * @throws InterruptedException
 */
@Test
public void AkkaObservable() throws InterruptedException {
    final Object mutex = new Object();
 
    final ActorRef actor = createActorOfType(AkkEcho.class);
 
    final List<String> results = new ArrayList<>();
    final List<String> distinctResults = new ArrayList<>();
 
    final Observable<String> observable = ObservableUtil.fromActor(actor);
 
    observable.subscribe(new Action1<String>() {
        @Override
        public void call(String o) {
            System.out.println(o);
 
            if(o.equals("done")){
                synchronized (mutex){ mutex.notify(); }
            }
            else{
                results.add(o);
            }
        }
    });
 
    observable.distinct().subscribe(new Action1<String>() {
        @Override
        public void call(String o) {
            distinctResults.add(o);
        }
    });
 
    actor.tell("foo",  ActorRef.noSender());
    actor.tell("foo",  ActorRef.noSender());
    actor.tell("foo",  ActorRef.noSender());
    actor.tell("bar",  ActorRef.noSender());
    actor.tell("done", ActorRef.noSender());
 
    synchronized (mutex){ mutex.wait(); }
 
    Assert.assertEquals(results, Arrays.asList("foo", "foo", "foo", "bar"));
    Assert.assertEquals(distinctResults, Arrays.asList("foo", "bar", "done"));
}
 
private ActorRef createActorOfType(Class<? extends Actor> clazz) {
    ActorSystem system = ActorSystem.create("client");
 
    return system.actorOf(Props.create(clazz), "rcv");
}

 

 

 

 

猜你喜欢