Actors模型专题
Akka教程
AKKA框架是一个平台,灵感来自ERlang,能更轻松地开发可扩展,实现多线程安全应用。虽然在大多数流行的语言并发是基于多线程之间的共享内存,使用同步方法防止写争夺,Akka提供的并发模型基于Actors。
Actors是一个轻量级的对象,通过发送消息实现交互。每个Actors在同一时间处理最多一个消息,可以发送消息给其他Actors。在同一时间可以于一个Java虚拟机存在数以百万计的参与者,构架是一个分层的父层(管理) - 子层,其中父层监控子层的行为。还可以很容易地扩展Actor运行在集群中各个节点之间 - 无需修改一行代码。每个演员都可以有内部状态(字段/变量) ,但通信只能通过消息传递,不会有共享数据结构(计数器,队列) 。Akka框架支持两种语言Java和Scala,
HellowWorld
以HellowWorld编写一个简单Actors:
public class HelloWorld extends UntypedActor
{
public static void main( String[] args )
{
//create and start the actor 创建启动Actor
ActorRef actor = actorOf(HelloWorld.class).start();
//send the message to the actor and wait for response 发送消息并等待回应
Object response = actor.ask("Munish").get();
//print the response打印消息
System.out.println(response);
//stop the actor 停止Actor
actor.stop();
}
@Override
public void onReceive(Object message) throws Exception {
//receive and reply to the message received 主要方法接受并回复
getContext().tryReply("Hello " + message);
}
}
以上是一个简单的启动Akka,并发送消息等待回复的案例。这是默认在一个JVM中,我们也可以很容易扩展到几个JVM之间相互调用,如下:
import akka.actor.UntypedActor;
import static akka.actor.Actors.*;
public class HelloWorldServer extends UntypedActor {
public void onReceive(Object msg) {
getContext().tryReply("Hello " + msg);
}
/**
* @param args
*/
public static void main(String[] args) {
// start the service启动这个Actor
remote().start("localhost", 2552).register("hello-service",
actorOf(HelloWorldServer.class));
}
}
这个Actor是接受字符串打印出来,启动这个服务。我们可以通过端口访问这个服务:
public class HelloWorldClient {
public static void main(String[] args) {
// client code
ActorRef actor = remote().actorFor(
"hello-service", "localhost", 2552);
Object response = actor.ask("Munish").get();
System.out.println(response);
}
}
客户端/服务器
下面我们看看一个完整的客户端/服务器方式如何编写Actor:
public class ServerActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private static int instanceCounter = 0;
@Override
public void preStart() {
instanceCounter++;
log.info("Starting ServerActor instance #" + instanceCounter
+ ", hashcode #" + this.hashCode());
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
getSender().tell(message + " got something");
} else if (message instanceof PoisonPill) {
getContext().system().shutdown();
}
}
@Override
public void postStop() {
log.info("Stoping ServerActor instance #" + instanceCounter
+ ", hashcode #" + this.hashCode());
instanceCounter--;
}
}
有了两个启动和关闭方法:preStart() postStop()
我们使用一个普通Java类启动它:
public class ServerActorSystem implements Bootable {
private LoggingAdapter log = null;
private ActorSystem system;
/*
* default constructor
*/
public ServerActorSystem() {
// load the configuration 加载配置
system = ActorSystem.create("ServerSys", ConfigFactory.load()
.getConfig("ServerSys"));
log = Logging.getLogger(system, this);
// create the actor
@SuppressWarnings("unused")
ActorRef actor = system.actorOf(new Props(ServerActor.class),
"serverActor");
}
public void shutdown() {
log.info("Shutting down the ServerActorSystem");
}
public void startup() {
// TODO Auto-generated method stub
}
public static void main(String[] args) {
new ServerActorSystem();
}
}
客户端的Actor是访问这个remote的Actor:
public class ClientActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private ActorRef remote;//远程Actor
public ClientActor(ActorRef inActor) {
remote = inActor;
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
if (((String) message).startsWith("Start") == true) {
log.info("Sending message to server - message# Hi there");
remote.tell("Hi there", getSelf());
} else {
log.info("Message received from Server -> " + message);
}
}
}
}
客户端同样需要一个启动类:
public class ClientActorSystem implements Bootable {
private LoggingAdapter log = null;
private ActorSystem system;
private ActorRef actor;
private ActorRef remoteActor;
/*
* Default Constructor
*/
public ClientActorSystem() {
system = ActorSystem.create("ClientSys", ConfigFactory.load()
.getConfig("ClientSys"));
log = Logging.getLogger(system, this);
}
/*
* Method demonstrates how to get a reference to the ServerActor deployed
* on a remote node and how to pass the message to the same.
* Key here is system.actorFor()
*/
@SuppressWarnings("serial")
public void remoteActorRefDemo() {
log.info("Creating a reference to remote actor");
// creating a reference to the remote ServerActor
// by passing the complete remote actor path
remoteActor = system
.actorFor("akka://ServerSys@127.0.0.1:2552/user/serverActor");
log.info("ServerActor with hashcode #" + remoteActor.hashCode());
// create a local actor and pass the reference of the remote actor
actor = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new ClientActor(remoteActor);
}
}));
// send a message to the local client actor
actor.tell("Start-RemoteActorRef");
}
....
}
这里remoteActorRefDemo方法展示如何获得远程部署的ServerActor,以及如何发送消息。
远程Actor有不同的访问方式,下面用远程部署机制创建一个Actor:
/*
* Method demonstrates how to create an instance of ServerActor on remote
* node and how to pass the message to the same. This method demonstrates one
* way to create the server node address
* Key here is system.actorOf()
*
* refer to the ServerActorSystem for information on new server actor creation
* identified via hashcode's
*/
@SuppressWarnings("serial")
public void remoteActorCreationDemo1() {
log.info("Creating a actor using remote deployment mechanism");
// create the address object that points to the remote server创建一个地址指向远程
Address addr = new Address("akka", "ServerSys", "127.0.0.1", 2552);
// creating the ServerActor on the specified remote server 创建一个ServerActor
final ActorRef serverActor = system.actorOf(new Props(ServerActor.class)
.withDeploy(new Deploy(new RemoteScope(addr))));
// create a local actor and pass the reference of the remote actor
actor = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new ClientActor(serverActor);
}
}));
// send a message to the local client actor
actor.tell("Start-RemoteActorCreationDemo1");
}
这里ServerActor有点类似远程代理类。下面再演示另外一种版本:
/*
* Method demonstrates how to create an instance of ServerActor on remote
* node and how to pass the message to the same. This method demonstrates an
* alternate way to create the server node address
* Key here is system.actorOf()
*
* Refer to the ServerActorSystem for information on new server actor creation
* identified via hashcode's
*/
@SuppressWarnings("serial")
public void remoteActorCreationDemo2() {
log.info("Creating a actor with remote deployment");
// alternate way to create the address object that points to the remote
// server //创建指向远程服务的地址对象
Address addr = AddressFromURIString
.parse("akka://ServerSys@127.0.0.1:2552");
// creating the ServerActor on the specified remote server
final ActorRef serverActor = system.actorOf(new Props(ServerActor.class)
.withDeploy(new Deploy(new RemoteScope(addr))));
// create a local actor and pass the reference of the remote actor
actor = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new ClientActor(serverActor);
}
}));
// send a message to the local client actor
actor.tell("Start-RemoteActorCreationDemo2");
}
Supervisoin管理策略
下面以邮件发送为案例使用Java+Akka实现:Akka的父层可以对子层实现管理策略,如下代码:
class EmailServiceActor extends UntypedActor {
private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof MessagingException) {
return resume();
} else if (t instanceof Exception) {
return stop();
} else {
return escalate();
}
}
});
@Override
public void onReceive(Object message) {
getContext().actorOf(new Props(EmailServiceWorker.class)).tell(message, self());
}
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
}
EmailServiceActor是一个父Actors,负责管理策略supervision ,定义了一个OneForOneStrategy。在apply方法中实现的是对Actor出错Exception后的处理方式,保证在Actor发生错误以后不影响整个系统的运行,也就是容错性。
其onReceive方法是将接受到的消息传递给子Actor EmailServiceWorker:
class EmailServiceWorker extends UntypedActor {
@Override
public void onReceive(Object message) {
try {
EmailService emailService = new EmailService();
emailService.sendEmail();
} catch (IOException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
} catch (MessagingException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
@Override
public void preStart() {
// getContext().system().scheduler().scheduleOnce(Duration.create(5, TimeUnit.SECONDS), self(), "emailWorker", getContext().system().dispatcher(), null);
}
@Override
public void postStop() {
}
}
在这个Actor中,真正实现调用EmailService发送消息。 EmailService源码这里
AKKA提供了两种管理策略:One-For-One 或 All-For-One,用来监视Ators建立一个容错模型。按这里查看或下载三个案例源码