Actors模型专题
使用Akka实现MapReduce的单词计数
源码按这里下载。前期知识准备:使用Akka发送1000万消息
流程如下:
客户端系统(FileReadActor)读取一个文本文件,发送其中每一行作为消息给ClientActor:
public class FileReadActor extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
String fileName = (String) message;
try {
BufferedReader reader = new BufferedReader(
new InputStreamReader(Thread.currentThread()
.getContextClassLoader().getResource(fileName)
.openStream()));
String line = null;
while ((line = reader.readLine()) != null) {
//System.out.println("File contents->" + line);
getSender().tell(line);
}
System.out.println("All lines send !");
// send the EOF message..
getSender().tell(String.valueOf("EOF"));
} catch (IOException x) {
System.err.format("IOException: %s%n", x);
}
} else
throw new IllegalArgumentException("Unknown message [" + message
+ "]");
}
}
ClientActor有一个引用指向RemoteActor代理(WCMapReduceActor),它将消息被传递到远程Actor。
以下是整个流程客户端调用代码:
final String fileName = "Othello.txt";
ActorSystem system = ActorSystem.create("ClientApplication",
ConfigFactory.load().getConfig("WCMapReduceClientApp"));
//创建一个FileReadActor
final ActorRef fileReadActor = system.actorOf(new Props(
FileReadActor.class));
//创建一个指向远程WCMapReduceActor的代理Actor
final ActorRef remoteActor = system.actorFor("akka://WCMapReduceApp@127.0.0.1:2552/user/WCMapReduceActor");
//创建客户端ClientActor
ActorRef actor = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new ClientActor(remoteActor);
}
}));
//读取文件的FileReadActor发送消息给客户端ClientActor
fileReadActor.tell(fileName,actor);
//给远程Actgor发送消息DISPLAY_LIST,让它显示结果。
remoteActor.tell("DISPLAY_LIST");
system.shutdown();
下面是远程服务端的执行逻辑,创建一个带有聚合Actor和Mapreduce的Actor的综合Actor:WCMapReduceActor,下面是创建过程:
system = ActorSystem.create("WCMapReduceApp", ConfigFactory.load()
.getConfig("WCMapReduceApp"));
// create the aggregate Actor 创建一个聚合Actor
aggregateActor = system.actorOf(new Props(AggregateActor.class));
// create the list of reduce Actors 创建reduce的Actor的列表
reduceRouter = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new ReduceActor(aggregateActor);
}
}).withRouter(new RoundRobinRouter(no_of_reduce_workers)));
// create the list of map Actors 创建map的Actor列表
mapRouter = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new MapActor(reduceRouter);
}
}).withRouter(new RoundRobinRouter(no_of_map_workers)));
// create the overall WCMapReduce Actor that acts as the remote actor
// for clients 创建一个为客户端调用的远程WCMapReduceActor,包含aggregateActor和mapRouter
wcMapReduceActor = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new WCMapReduceActor(aggregateActor, mapRouter);
}
}).withDispatcher("priorityMailBox-dispatcher"), "WCMapReduceActor");
注意到最后一行创建WCMapReduceActor时,有两个输入actor:aggregateActor和mapRouter,使用分发器priorityMailBox-dispatcher,这
远程WCMapReduceActor接受两种消息,一个是 mapreduce处理的消息,一个是上面"DISPLAY_LIST"的显示结果的消息。使用PriorityMailBox来对消息的优先级进行过滤。PriorityMailBox用来分离 mapreduce请求和"DISPLAY_LIST"两种消息。
public static class MyPriorityMailBox extends UnboundedPriorityMailbox {
public MyPriorityMailBox(ActorSystem.Settings settings, Config config) {
// Creating a new PriorityGenerator,
super(new PriorityGenerator() {
@Override
public int gen(Object message) {
if (message.equals("DISPLAY_LIST"))
return 2; // 'DisplayList messages should be treated
// last if possible
else if (message.equals(PoisonPill.getInstance()))
return 3; // PoisonPill when no other left
else
return 0; // By default they go with high priority
}
});
}
}
上面是创建WCMapReduceActor的过程,WCMapReduceActor内部代码如洗:
远程WCMapReduceActor中 要么发送消息给MapActor (使用 RoundRobinRouter dispatcher) :进行字数统计计算,要么发送消息给聚合Actor获得计算结果。
public class WCMapReduceActor extends UntypedActor {
private ActorRef mapRouter;
private ActorRef aggregateActor;
public void onReceive(Object message) {
if (message instanceof String) {
if (((String) message).compareTo("DISPLAY_LIST") == 0) {
System.out.println("Got Display Message");
aggregateActor.tell(message, getSender());//发送消息给聚合actor
} else {
mapRouter.tell(message);//发送消息给MapActor
}
}
}
public WCMapReduceActor(ActorRef inAggregateActor, ActorRef inMapRouter) {
mapRouter = inMapRouter;
aggregateActor = inAggregateActor;
}
}
下面看看mapRouter是如何统计字数的:
public void onReceive(Object message) {
if (message instanceof String) {
String work = (String) message;
// perform the work
List<Result> list = evaluateExpression(work);
// reply with the result
actor.tell(list);
} else
throw new IllegalArgumentException("Unknown message [" + message
+ "]");
}
mapRouter首先是进行对单词进行切分evaluateExpression,切分结果再调用发送消息给reduce Actor。
在ReduceActor中,如下:
public void onReceive(Object message) throws Exception {
if (message instanceof List) {
@SuppressWarnings("unchecked")
List<Result> work = (List<Result>) message;
// perform the work
NavigableMap<String, Integer> reducedList = reduce(work);
// reply with the result
actor.tell(reducedList);
} else
throw new IllegalArgumentException("Unknown message [" + message
+ "]");
}
使用reduce方法进行字数统计。然后向聚合Actor发送消息。
聚合Actor统计最后结果:
public class AggregateActor extends UntypedActor {
private Map<String, Integer> finalReducedMap = new HashMap<String, Integer>();
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Integer> reducedList = (Map<String, Integer>) message;
aggregateInMemoryReduce(reducedList);
} else if (message instanceof String) {
if (((String) message).compareTo("DISPLAY_LIST") == 0) {
//getSender().tell(finalReducedMap.toString());
System.out.println(finalReducedMap.toString());
}
}
}
private void aggregateInMemoryReduce(Map<String, Integer> reducedList) {
Iterator<String> iter = reducedList.keySet().iterator();
while (iter.hasNext()) {
String key = iter.next();
if (finalReducedMap.containsKey(key)) {
Integer count = reducedList.get(key) + finalReducedMap.get(key);
finalReducedMap.put(key, count);
} else {
finalReducedMap.put(key, reducedList.get(key));
}
}
}
}