Actors模型专题

使用Akka实现MapReduce的单词计数

  源码按这里下载。前期知识准备:使用Akka发送1000万消息

流程如下:

客户端系统(FileReadActor)读取一个文本文件,发送其中每一行作为消息给ClientActor:

public class FileReadActor extends UntypedActor {

        @Override
        public void onReceive(Object message) throws Exception {

                if (message instanceof String) {
                        String fileName = (String) message;
                        try {
                                BufferedReader reader = new BufferedReader(
                                                new InputStreamReader(Thread.currentThread()
                                                                .getContextClassLoader().getResource(fileName)
                                                                .openStream()));
                                String line = null;
                                while ((line = reader.readLine()) != null) {
                                        //System.out.println("File contents->" + line);
                                        getSender().tell(line);
                                }
                                System.out.println("All lines send !");
                                // send the EOF message..
                                getSender().tell(String.valueOf("EOF"));
                        } catch (IOException x) {
                                System.err.format("IOException: %s%n", x);
                        }
                } else
                        throw new IllegalArgumentException("Unknown message [" + message
                                        + "]");
        }
}

ClientActor有一个引用指向RemoteActor代理(WCMapReduceActor),它将消息被传递到远程Actor。

以下是整个流程客户端调用代码:

final String fileName = "Othello.txt";

ActorSystem system = ActorSystem.create("ClientApplication",
ConfigFactory.load().getConfig("WCMapReduceClientApp"));

//创建一个FileReadActor
final ActorRef fileReadActor = system.actorOf(new Props( FileReadActor.class));

//创建一个指向远程WCMapReduceActor的代理Actor
final ActorRef remoteActor = system.actorFor("akka://WCMapReduceApp@127.0.0.1:2552/user/WCMapReduceActor");

//创建客户端ClientActor
ActorRef actor = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new ClientActor(remoteActor);
}
}));

//读取文件的FileReadActor发送消息给客户端ClientActor
fileReadActor.tell(fileName,actor);

//给远程Actgor发送消息DISPLAY_LIST,让它显示结果。
remoteActor.tell("DISPLAY_LIST");

system.shutdown();

 

下面是远程服务端的执行逻辑,创建一个带有聚合Actor和Mapreduce的Actor的综合Actor:WCMapReduceActor,下面是创建过程:

system = ActorSystem.create("WCMapReduceApp", ConfigFactory.load()
.getConfig("WCMapReduceApp"));

// create the aggregate Actor 创建一个聚合Actor
aggregateActor = system.actorOf(new Props(AggregateActor.class));

// create the list of reduce Actors 创建reduce的Actor的列表
reduceRouter = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new ReduceActor(aggregateActor);
}
}).withRouter(new RoundRobinRouter(no_of_reduce_workers)));

// create the list of map Actors 创建map的Actor列表
mapRouter = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new MapActor(reduceRouter);
}
}).withRouter(new RoundRobinRouter(no_of_map_workers)));

// create the overall WCMapReduce Actor that acts as the remote actor
// for clients 创建一个为客户端调用的远程WCMapReduceActor,包含aggregateActor和mapRouter
wcMapReduceActor = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new WCMapReduceActor(aggregateActor, mapRouter);
}
}).withDispatcher("priorityMailBox-dispatcher"), "WCMapReduceActor");

注意到最后一行创建WCMapReduceActor时,有两个输入actor:aggregateActor和mapRouter,使用分发器priorityMailBox-dispatcher,这

远程WCMapReduceActor接受两种消息,一个是 mapreduce处理的消息,一个是上面"DISPLAY_LIST"的显示结果的消息。使用PriorityMailBox来对消息的优先级进行过滤。PriorityMailBox用来分离 mapreduce请求和"DISPLAY_LIST"两种消息。

public static class MyPriorityMailBox extends UnboundedPriorityMailbox {

                public MyPriorityMailBox(ActorSystem.Settings settings, Config config) {

                        // Creating a new PriorityGenerator,
                        super(new PriorityGenerator() {
                                @Override
                                public int gen(Object message) {
                                        if (message.equals("DISPLAY_LIST"))
                                                return 2; // 'DisplayList messages should be treated
                                                                        // last if possible
                                        else if (message.equals(PoisonPill.getInstance()))
                                                return 3; // PoisonPill when no other left
                                        else
                                                return 0; // By default they go with high priority
                                }
                        });
                }

        }

上面是创建WCMapReduceActor的过程,WCMapReduceActor内部代码如洗:

远程WCMapReduceActor中 要么发送消息给MapActor (使用 RoundRobinRouter dispatcher) :进行字数统计计算,要么发送消息给聚合Actor获得计算结果。

public class WCMapReduceActor extends UntypedActor {

        private ActorRef mapRouter;
        private ActorRef aggregateActor;

        public void onReceive(Object message) {
                if (message instanceof String) {
                        if (((String) message).compareTo("DISPLAY_LIST") == 0) {
                                System.out.println("Got Display Message");
                                aggregateActor.tell(message, getSender());//发送消息给聚合actor
                        } else {
                                mapRouter.tell(message);//发送消息给MapActor
                        }
                }
        }

        public WCMapReduceActor(ActorRef inAggregateActor, ActorRef inMapRouter) {

                mapRouter = inMapRouter;
                aggregateActor = inAggregateActor;
        }

}

下面看看mapRouter是如何统计字数的:

public void onReceive(Object message) {
                if (message instanceof String) {
                        String work = (String) message;

                        // perform the work
                        List<Result> list = evaluateExpression(work);

                        // reply with the result
                        actor.tell(list);

                } else
                        throw new IllegalArgumentException("Unknown message [" + message
                                        + "]");
        }

mapRouter首先是进行对单词进行切分evaluateExpression,切分结果再调用发送消息给reduce Actor。

在ReduceActor中,如下:

public void onReceive(Object message) throws Exception {
                if (message instanceof List) {

                        @SuppressWarnings("unchecked")
                        List<Result> work = (List<Result>) message;

                        // perform the work
                        NavigableMap<String, Integer> reducedList = reduce(work);

                        // reply with the result
                        actor.tell(reducedList);

                } else
                        throw new IllegalArgumentException("Unknown message [" + message
                                        + "]");
        }

使用reduce方法进行字数统计。然后向聚合Actor发送消息。

聚合Actor统计最后结果:

public class AggregateActor extends UntypedActor {

        private Map<String, Integer> finalReducedMap = new HashMap<String, Integer>();

        @Override
        public void onReceive(Object message) throws Exception {
                if (message instanceof Map) {
                        @SuppressWarnings("unchecked")
                        Map<String, Integer> reducedList = (Map<String, Integer>) message;
                        aggregateInMemoryReduce(reducedList);
                } else if (message instanceof String) {
                        if (((String) message).compareTo("DISPLAY_LIST") == 0) {
                                //getSender().tell(finalReducedMap.toString());
                                System.out.println(finalReducedMap.toString());
                               
                        }
                }
        }

        private void aggregateInMemoryReduce(Map<String, Integer> reducedList) {

                Iterator<String> iter = reducedList.keySet().iterator();
                while (iter.hasNext()) {
                        String key = iter.next();
                        if (finalReducedMap.containsKey(key)) {
                                Integer count = reducedList.get(key) + finalReducedMap.get(key);
                                finalReducedMap.put(key, count);
                        } else {
                                finalReducedMap.put(key, reducedList.get(key));
                        }

                }
        }

}


 

大数据专辑

Hadoop专辑