Actors模型专题

Akka性能测试

  源码下载

Akka承诺高并发。那么有什么更好的方式来模拟呢?,让它处理1000万信息花费多少时间,使用普通硬件和软件并且没有任何低级别的微调。这个例子使用Akka产生1000万条信息,计算处理它们所花费的时间。该程序演示了路由的概念,roundrobinrouter是用于将负载分发到一组worker工作者上面,类似负载平衡概念。

运行的硬件:英特尔酷睿4核 4 GB RAM的iMac,分配1024M给JVM堆Heap空间,在23秒内处理1000万信息,每秒处理400K信息。

该案例的代码结果如下,有两个Actor: workerActor和jobCOntrollerActor:

akka性能

workerActor使用10个工作者运行,这个10个分配策略使用RoundRobinRouter,也就是轮流策略;jobControllerActor是收集响应消息直到1000万数量达到。

入口调用是:

new ApplicationManagerSystem().generateLoad()

ApplicationManagerSystem代码如下:

public ApplicationManagerSystem() {
 
              final int no_of_workers = 10;
 
              system = ActorSystem.create("LoadGeneratorApp");
 
              final ActorRef appManager = system.actorOf(
                            new Props(new UntypedActorFactory() {
                                   public UntypedActor create() {
                                          return new JobControllerActor(no_of_msgs);
                                   }
                            }), "jobController");//启动jobController
 
              router = system.actorOf(new Props(new UntypedActorFactory() {
                     public UntypedActor create() {
                            return new WorkerActor(appManager);
                     }
              }).withRouter(new RoundRobinRouter(no_of_workers)));//启动带RoundRobinRouter的workerActor
       }
 
       private void generateLoad() {//开始产生1000万消息
              for (int i = no_of_msgs; i >= 0; i--) {
                     router.tell("Job Id " + i + "# send");
              }
              System.out.println("All jobs sent successfully");
       }

产生1000万消息首先调用WorkerActor,以发送消息方式。

workerActor如下:

public class WorkerActor extends UntypedActor {
 
       private ActorRef jobController;//jobController是接受者
 
       @Override //接受到generateLoad的发送消息
       public void onReceive(Object message) throws Exception {
              // using scheduler to send the reply after 1000 milliseconds
              getContext()
                            .system()
                            .scheduler()
                            .scheduleOnce(Duration.create(1000, TimeUnit.MILLISECONDS),
                                          jobController, "Done");//jobController是接受者
       }
 
       public WorkerActor(ActorRef inJobController) {
              jobController = inJobController;
       }
}

jobController的代码如下:

public class JobControllerActor extends UntypedActor {
 
       int count = 0;
       long startedTime = System.currentTimeMillis();
       int no_of_msgs = 0;
 
       @Override //接受来自workActor消息
       public void onReceive(Object message) throws Exception {
 
              if (message instanceof String) {
                     if (((String) message).compareTo("Done") == 0) {
                            count++;//计数增加
                            if (count == no_of_msgs) {//计数到达1000万结束,统计时间
                                   long now = System.currentTimeMillis();
                                   System.out.println("All messages processed in "
                                                 + (now - startedTime) / 1000 + " seconds");
 
                                   System.out.println("Total Number of messages processed "
                                                 + count);
                                   getContext().system().shutdown();
                            }
                     }
              }
 
       }
}

 

使用Akka实现MapReduce的单词计数