Actors模型专题

Akka+ZeroMQ消息生产者和消费者

  Akka使用一个Actor作为ZeroMQ的消息发布者publisher:

public class PublisherActor extends UntypedActor {
        public static final Object TICK = "TICK";
        int count = 0;
        Cancellable cancellable;
        ActorRef pubSocket = ZeroMQExtension.get(getContext().system())
                        .newPubSocket(new Bind("tcp://127.0.0.1:1237"));

        @Override
        public void preStart() {
                cancellable = getContext()
                                .system()
                                .scheduler()
                                .schedule(Duration.parse("1 second"),
                                                Duration.parse("1 second"), getSelf(), TICK);
        }

        @Override
        public void onReceive(Object message) throws Exception {
                if (message.equals(TICK)) {
                        pubSocket.tell(new ZMQMessage(new Frame("someTopic"), new Frame(
                                        "This is the workload " + ++count)));
                        
                        if(count==10)
                                cancellable.cancel();
                }
        }
}

zeroMQ的一个监听者WorkerTaskA:

public class WorkerTaskA extends UntypedActor {
        ActorRef subSocket = ZeroMQExtension.get(getContext().system())
                        .newSubSocket(new Connect("tcp://127.0.0.1:1237"),
                                        new Listener(getSelf()), new Subscribe("someTopic"));

        LoggingAdapter log = Logging.getLogger(getContext().system(), this);

        @Override
        public void onReceive(Object message) throws Exception {

                if (message instanceof ZMQMessage) {
                        ZMQMessage m = (ZMQMessage) message;
                        String mesg = new String(m.payload(1));
                        log.info("Received Message @ A -> {}",mesg);
                }
        }
}

客户端调用代码:

public class MyActorSystem {

        /**
         * @param args
         */
        public static void main(String[] args) {
                ActorSystem system = ActorSystem.create("zeromqTest");
                system.actorOf(new Props(WorkerTaskA.class), "workerA");
                system.actorOf(new Props(WorkerTaskB.class), "workerB");
                system.actorOf(new Props(PublisherActor.class), "publisher");
        }

}

Akka+ZeroMQ源码下载

Actor也可以作为路由器,随机将消息分派给worka或workb:

public class RouterActor extends UntypedActor {
        public static final Object TICK = "TICK";

        Random random = new Random(3);
        int count = 0;
        Cancellable cancellable;

        ActorRef routerSocket = ZeroMQExtension.get(getContext().system())
                        .newRouterSocket(
                                        new SocketOption[] { new Listener(getSelf()),
                                                        new Bind("tcp://127.0.0.1:1237"),
                                                        new HighWatermark(50000) });
        LoggingAdapter log = Logging.getLogger(getContext().system(), this);

        @Override
        public void preStart() {
                cancellable = getContext()
                                .system()
                                .scheduler()
                                .schedule(Duration.parse("1 second"),
                                                Duration.parse("1 second"), getSelf(), TICK);
        }

        @Override
        public void onReceive(Object message) throws Exception {
                if (message.equals(TICK)) {

                        if (random.nextBoolean() == true) {
                                routerSocket.tell(new ZMQMessage(new Frame("A"), new Frame(
                                                "This is the workload for A")));
                        } else {
                                routerSocket.tell(new ZMQMessage(new Frame("B"), new Frame(
                                                "This is the workload for B")));

                        }
                        count++;
                        if (count == 10)
                                cancellable.cancel();

                } else if (message instanceof ZMQMessage) {
                        ZMQMessage m = (ZMQMessage) message;
                        String replier = new String(m.payload(0));
                        String msg = new String(m.payload(1));
                        log.info("Received message from {} with mesg -> {}", replier, msg);
                }

        }

}