Actors模型专题

Akka教程

  AKKA框架是一个平台,灵感来自ERlang,能更轻松地开发可扩展,实现多线程安全应用。虽然在大多数流行的语言并发是基于多线程之间的共享内存,使用同步方法防止写争夺,Akka提供的并发模型基于Actors。

  Actors是一个轻量级的对象,通过发送消息实现交互。每个Actors在同一时间处理最多一个消息,可以发送消息给其他Actors。在同一时间可以于一个Java虚拟机存在数以百万计的参与者,构架是一个分层的父层(管理) - 子层,其中父层监控子层的行为。还可以很容易地扩展Actor运行在集群中各个节点之间 - 无需修改一行代码。每个演员都可以有内部状态(字段/变量) ,但通信只能通过消息传递,不会有共享数据结构(计数器,队列) 。Akka框架支持两种语言Java和Scala,

 

HellowWorld

  以HellowWorld编写一个简单Actors:

public class HelloWorld extends UntypedActor
{
    public static void main( String[] args )
    {
        //create and start the actor 创建启动Actor
            ActorRef actor = actorOf(HelloWorld.class).start();
       
        //send the message to the actor and wait for response 发送消息并等待回应
            Object response = actor.ask("Munish").get();
       
            //print the response打印消息
        System.out.println(response);
       
        //stop the actor 停止Actor
        actor.stop();
    }

        @Override
        public void onReceive(Object message) throws Exception {               
                //receive and reply to the message received 主要方法接受并回复
                getContext().tryReply("Hello " + message);
               
        }
}

以上是一个简单的启动Akka,并发送消息等待回复的案例。这是默认在一个JVM中,我们也可以很容易扩展到几个JVM之间相互调用,如下:

import akka.actor.UntypedActor;
import static akka.actor.Actors.*;

public class HelloWorldServer extends UntypedActor {

        public void onReceive(Object msg) {
                getContext().tryReply("Hello " + msg);
        }

        /**
         * @param args
         */
        public static void main(String[] args) {
                // start the service启动这个Actor
                remote().start("localhost", 2552).register("hello-service",
                                actorOf(HelloWorldServer.class));

        }

}

这个Actor是接受字符串打印出来,启动这个服务。我们可以通过端口访问这个服务:

public class HelloWorldClient {
        public static void main(String[] args) {
                // client code
                ActorRef actor = remote().actorFor(
                  "hello-service", "localhost", 2552);
                Object response = actor.ask("Munish").get();
                System.out.println(response);
        }

}

以上Helloworld源码下载

 

客户端/服务器

下面我们看看一个完整的客户端/服务器方式如何编写Actor:

public class ServerActor extends UntypedActor {

        LoggingAdapter log = Logging.getLogger(getContext().system(), this);
        private static int instanceCounter = 0;

        @Override
        public void preStart() {
                instanceCounter++;
                log.info("Starting ServerActor instance #" + instanceCounter
                                + ", hashcode #" + this.hashCode());
        }

        @Override
        public void onReceive(Object message) throws Exception {
                if (message instanceof String) {
                        getSender().tell(message + " got something");
                } else if (message instanceof PoisonPill) {
                        getContext().system().shutdown();
                }
        }

        @Override
        public void postStop() {
                log.info("Stoping ServerActor instance #" + instanceCounter
                                + ", hashcode #" + this.hashCode());
                instanceCounter--;
        }

}

有了两个启动和关闭方法:preStart() postStop()

我们使用一个普通Java类启动它:

public class ServerActorSystem implements Bootable {

        private LoggingAdapter log = null;
        private ActorSystem system;

        /*
         * default constructor
         */
        public ServerActorSystem() {
                // load the configuration 加载配置
                system = ActorSystem.create("ServerSys", ConfigFactory.load()
                                .getConfig("ServerSys"));
                log = Logging.getLogger(system, this);
                // create the actor
                @SuppressWarnings("unused")
                ActorRef actor = system.actorOf(new Props(ServerActor.class),
                                "serverActor");
        }

        public void shutdown() {
                log.info("Shutting down the ServerActorSystem");

        }

        public void startup() {
                // TODO Auto-generated method stub

        }

        public static void main(String[] args) {

                new ServerActorSystem();

        }

}

客户端的Actor是访问这个remote的Actor:

public class ClientActor extends UntypedActor {

        LoggingAdapter log = Logging.getLogger(getContext().system(), this);
        private ActorRef remote;//远程Actor

        public ClientActor(ActorRef inActor) {
                remote = inActor;
        }

        @Override
        public void onReceive(Object message) throws Exception {

                if (message instanceof String) {
                        if (((String) message).startsWith("Start") == true) {
                                log.info("Sending message to server - message# Hi there");
                                remote.tell("Hi there", getSelf());
                        } else {
                                log.info("Message received from Server -> " + message);
                        }
                }

        }

}

客户端同样需要一个启动类:

public class ClientActorSystem implements Bootable {

        private LoggingAdapter log = null;
        private ActorSystem system;
        private ActorRef actor;
        private ActorRef remoteActor;

        /*
         * Default Constructor
         */
        public ClientActorSystem() {
                system = ActorSystem.create("ClientSys", ConfigFactory.load()
                                .getConfig("ClientSys"));
                log = Logging.getLogger(system, this);

        }

        /*
         * Method demonstrates how to get a reference to the ServerActor deployed
         * on a remote node and how to pass the message to the same.
         * Key here is system.actorFor()
         */
        @SuppressWarnings("serial")
        public void remoteActorRefDemo() {
                log.info("Creating a reference to remote actor");
                // creating a reference to the remote ServerActor
                // by passing the complete remote actor path
                remoteActor = system
                                .actorFor("akka://ServerSys@127.0.0.1:2552/user/serverActor");

                log.info("ServerActor with hashcode #" + remoteActor.hashCode());
               
                // create a local actor and pass the reference of the remote actor
                actor = system.actorOf(new Props(new UntypedActorFactory() {
                        public UntypedActor create() {
                                return new ClientActor(remoteActor);
                        }
                }));
                // send a message to the local client actor
                actor.tell("Start-RemoteActorRef");
        }

....

}

这里remoteActorRefDemo方法展示如何获得远程部署的ServerActor,以及如何发送消息。

远程Actor有不同的访问方式,下面用远程部署机制创建一个Actor:

/*
         * Method demonstrates how to create an instance of ServerActor on remote
         * node and how to pass the message to the same. This method demonstrates one
         * way to create the server node address
         * Key here is system.actorOf()
         *
         * refer to the ServerActorSystem for information on new server actor creation
         * identified via hashcode's
         */
        @SuppressWarnings("serial")
        public void remoteActorCreationDemo1() {
                log.info("Creating a actor using remote deployment mechanism");

                // create the address object that points to the remote server创建一个地址指向远程
                Address addr = new Address("akka", "ServerSys", "127.0.0.1", 2552);

                // creating the ServerActor on the specified remote server 创建一个ServerActor
                final ActorRef serverActor = system.actorOf(new Props(ServerActor.class)
                                .withDeploy(new Deploy(new RemoteScope(addr))));

                // create a local actor and pass the reference of the remote actor
                actor = system.actorOf(new Props(new UntypedActorFactory() {
                        public UntypedActor create() {
                                return new ClientActor(serverActor);
                        }
                }));
                // send a message to the local client actor
                actor.tell("Start-RemoteActorCreationDemo1");
        }
       

这里ServerActor有点类似远程代理类。下面再演示另外一种版本:

/*
         * Method demonstrates how to create an instance of ServerActor on remote
         * node and how to pass the message to the same. This method demonstrates an
         * alternate way to create the server node address
         * Key here is system.actorOf()
         *
         * Refer to the ServerActorSystem for information on new server actor creation
         * identified via hashcode's
         */
        @SuppressWarnings("serial")
        public void remoteActorCreationDemo2() {
                log.info("Creating a actor with remote deployment");

                // alternate way to create the address object that points to the remote
                // server //创建指向远程服务的地址对象
                Address addr = AddressFromURIString
                                .parse("akka://ServerSys@127.0.0.1:2552");

                // creating the ServerActor on the specified remote server
                final ActorRef serverActor = system.actorOf(new Props(ServerActor.class)
                                .withDeploy(new Deploy(new RemoteScope(addr))));

                // create a local actor and pass the reference of the remote actor
                actor = system.actorOf(new Props(new UntypedActorFactory() {
                        public UntypedActor create() {
                                return new ClientActor(serverActor);
                        }
                }));
                // send a message to the local client actor
                actor.tell("Start-RemoteActorCreationDemo2");
        }

以上多个远程Actor演示源码下载

 

Supervisoin管理策略

  下面以邮件发送为案例使用Java+Akka实现:Akka的父层可以对子层实现管理策略,如下代码:

class EmailServiceActor extends UntypedActor { 
   private static SupervisorStrategy strategy = 
       new OneForOneStrategy(10, Duration.create("1 minute"), 
           new Function<Throwable, Directive>() { 
             @Override 
             public Directive apply(Throwable t) { 
               if (t instanceof MessagingException) { 
                 return resume(); 
               } else if (t instanceof Exception) { 
                 return stop(); 
               } else { 
                 return escalate(); 
               } 
             } 
           }); 
   @Override 
   public void onReceive(Object message) { 
     getContext().actorOf(new Props(EmailServiceWorker.class)).tell(message, self()); 
   } 
   @Override 
   public SupervisorStrategy supervisorStrategy() { 
     return strategy; 
   } 
 }

EmailServiceActor是一个父Actors,负责管理策略supervision ,定义了一个OneForOneStrategy。在apply方法中实现的是对Actor出错Exception后的处理方式,保证在Actor发生错误以后不影响整个系统的运行,也就是容错性。

其onReceive方法是将接受到的消息传递给子Actor EmailServiceWorker:

class EmailServiceWorker extends UntypedActor { 
   @Override 
   public void onReceive(Object message) { 
     try { 
       EmailService emailService = new EmailService(); 
       emailService.sendEmail(); 
     } catch (IOException e) { 
       e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. 
     } catch (MessagingException e) { 
       e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. 
     } 
   } 
   @Override 
   public void preStart() { 
 //    getContext().system().scheduler().scheduleOnce(Duration.create(5, TimeUnit.SECONDS), self(), "emailWorker", getContext().system().dispatcher(), null); 
   } 
   @Override 
   public void postStop() { 
   } 
 }

  在这个Actor中,真正实现调用EmailService发送消息。  EmailService源码这里

  AKKA提供了两种管理策略:One-For-One 或 All-For-One,用来监视Ators建立一个容错模型。按这里查看或下载三个案例源码

 

使用Akka发送1000万消息

并发专题

AKKA框架快速介绍

Akka的产品化应用经验分享 

Scala入门之函数编程