Actors模型专题
Akka+ZeroMQ消息生产者和消费者
Akka使用一个Actor作为ZeroMQ的消息发布者publisher:
public class PublisherActor extends UntypedActor {public static final Object TICK = "TICK";int count = 0;Cancellable cancellable;ActorRef pubSocket = ZeroMQExtension.get(getContext().system()).newPubSocket(new Bind("tcp://127.0.0.1:1237"));@Overridepublic void preStart() {cancellable = getContext().system().scheduler().schedule(Duration.parse("1 second"),Duration.parse("1 second"), getSelf(), TICK);}@Overridepublic void onReceive(Object message) throws Exception {if (message.equals(TICK)) {pubSocket.tell(new ZMQMessage(new Frame("someTopic"), new Frame("This is the workload " + ++count)));if(count==10)cancellable.cancel();}}}
zeroMQ的一个监听者WorkerTaskA:
public class WorkerTaskA extends UntypedActor {ActorRef subSocket = ZeroMQExtension.get(getContext().system()).newSubSocket(new Connect("tcp://127.0.0.1:1237"),new Listener(getSelf()), new Subscribe("someTopic"));LoggingAdapter log = Logging.getLogger(getContext().system(), this);@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof ZMQMessage) {ZMQMessage m = (ZMQMessage) message;String mesg = new String(m.payload(1));log.info("Received Message @ A -> {}",mesg);}}}
客户端调用代码:
public class MyActorSystem {/*** @param args*/public static void main(String[] args) {ActorSystem system = ActorSystem.create("zeromqTest");system.actorOf(new Props(WorkerTaskA.class), "workerA");system.actorOf(new Props(WorkerTaskB.class), "workerB");system.actorOf(new Props(PublisherActor.class), "publisher");}}
Actor也可以作为路由器,随机将消息分派给worka或workb:
public class RouterActor extends UntypedActor {public static final Object TICK = "TICK";Random random = new Random(3);int count = 0;Cancellable cancellable;ActorRef routerSocket = ZeroMQExtension.get(getContext().system()).newRouterSocket(new SocketOption[] { new Listener(getSelf()),new Bind("tcp://127.0.0.1:1237"),new HighWatermark(50000) });LoggingAdapter log = Logging.getLogger(getContext().system(), this);@Overridepublic void preStart() {cancellable = getContext().system().scheduler().schedule(Duration.parse("1 second"),Duration.parse("1 second"), getSelf(), TICK);}@Overridepublic void onReceive(Object message) throws Exception {if (message.equals(TICK)) {if (random.nextBoolean() == true) {routerSocket.tell(new ZMQMessage(new Frame("A"), new Frame("This is the workload for A")));} else {routerSocket.tell(new ZMQMessage(new Frame("B"), new Frame("This is the workload for B")));}count++;if (count == 10)cancellable.cancel();} else if (message instanceof ZMQMessage) {ZMQMessage m = (ZMQMessage) message;String replier = new String(m.payload(0));String msg = new String(m.payload(1));log.info("Received message from {} with mesg -> {}", replier, msg);}}}