Actors模型专题
Akka框架
AKKA框架是一个平台,灵感来自ERlang,能更轻松地开发可扩展,实现多线程安全应用。虽然在大多数流行的语言并发是基于多线程之间的共享内存,使用同步方法防止写争夺,Akka提供的并发模型基于Actors。什么是Actor?
Actors提供一个简单统一模型:
隔离计算实体
"Share nothing"
没有任何地方同步
异步消息传递
不可变的消息
Actors有类似消息的mailbox / queue
Actor来自于哪里?
首先是由Carl Hewitt在1973定义
由Erlang OTP (Open Telecom Platform) 推广
消息传递更加符合面向对象的原始意图
Alan Kay说:
"OOP 对于我来说仅仅意味着消息, 保护隐藏在内部进行状态处理,然后在后期和外界其他事物进行绑定。"
为什么使用Actor?
没有使用Actor,你的CPU占用如下:
使用Actor会降低CPU占用:
Akka的作者
Jonas Bonér
经历:
AspectWerkz
JRockit
Terracotta
Typesafe 公司创始人,与Martin Oderksy (May 2011)一起
为什么使用Akka?
垂直扩展Scale up
Akka actors是轻量,能够平均在一个系统创建数千个。
线程是重量的,场景切换相当慢。
横向扩展Scale out
Actor能够在代码没有任何修改时在远程运行
失败恢复和错误处理
Actor的Java代码
import akka.actor.UntypedActor;
public class ToStringActor extends UntypedActor {
@Override
public void onReceive(Object message) {
System.out.println(message.toString());
}
}
使用这个Actor发送消息:
ActorRef toString = Actors.actorOf(ToStringActor.class).start();
toString.sendOneWay(42);
42
toString.sendOneWay(3.14159);
3.14159
toString.sendOneWay(true);
true
接受消息:
public class DoubleItActor extends UntypedActor {
@Override
public void onReceive(Object message) {
getContext().replyUnsafe(
message.toString() +
message.toString());
}
}
Or:
getContext().replySafe(
发送且等答复:
ActorRef doubleIt = Actors.actorOf(DoubleItActor.class).start();
System.out.println(
doubleIt.sendRequestReply("foo"));
foofoo
System.out.println(
doubleIt.sendRequestReplyFuture("bar")
.get());
barbar
System.out.println(
doubleIt.sendRequestReplyFuture("bar")
.await().result());
Some(barbar)
如果在Java8 引入Monad将简单:
System.out.println(
doubleIt.sendRequestReplyFuture("meh")
.map(new Function<String,Integer>() {
public Integer apply(String param) {
return param.length();
}
}).await().result());
Some(6)
单向回复:
ActorRef toString = actorOf(ToStringActor.class) .start();
doubleIt.sendOneWay("foobar", toString);
foobarfoobar
匿名Actor:
ActorRef act = actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new UntypedActor() {
public void onReceive(Object message) {
System.out.println("Received : "
+ message);
}
};
}
}).start();
Scala的Actor代码
import akka.actor.{Actor}
class ToStringActor extends Actor {
override def receive = {
case message:Any =>
println(message.toString)
}
}
发送消息:
val toString = Actor.actorOf(classOf[ToStringActor]).start
toString ! 42 // same as toString.!(42)
42
ToString ! 3.14159
3.14159
toString ! true
true
接受消息:
class DoubleItActor extends Actor {
override def receive = {
case message:Any =>
self.reply(
message.toString + message.toString)
}
}
Or:
self.reply_?
发送且等答复:
val doubleIt = Actors.actorOf(classOf[DoubleItActor]).start
println(doubleIt !! "foo")
Some(foofoo)
println((doubleIt !!! "bar").get)
barbar
println((doubleIt !!! "bar").await.result)
Some(barbar)
println((doubleIt !!! "meh")
.map((x:String) => x.length).await.result)
Some(6)
单向回复:
val toString = Actors.actorOf(classOf[ToStringActor]).start
(double ! sendOneWay)(toString)
foobarfoobar
匿名Actor:
val fooActor = Actor.actorOf(
new Actor {
def receive = {
case x => println("Foo: " + x)
}
}).start
Clojure的Actor代码:
(ns tfd.clojurefun.ClojureActor
(:gen-class
:extends akka.actor.UntypedActor
)
)
(defn -onReceive [this msg]
(println (.toString msg))
)
Typed actors
public interface Counter {
public void increment();
public Integer getValue();
public Future<Integer> getFutureValue();
}
public class CounterActor extends TypedActor
implements Counter {
private Integer value = 0;
public void increment() {
try { Thread.sleep(1000); }
catch (InterruptedException ex) { }
value++;
}
public Integer getValue() {
return value;
}
public Future<Integer> getFutureValue() {
return future(value);
}
}
调用运行:
Counter counter = TypedActor.newInstance(Counter.class,
CounterActor.class, 2000);
counter.increment();
System.out.println(counter.getValue());
1
counter.increment();
System.out.println(
counter.getFutureValue().get());
2
远程Actor
两种方式:下面是服务端管理的远程:
On Remote Server:
RemoteServerModule server =
remote().start("localhost", 2553);
server.register("double-it",
actorOf(DoubleItActor.class));
server.register("to-string",
actorOf(ToStringActor.class));
On Client:
ActorRef doubleIt = remote()
.actorFor("double-it", "localhost", 2553);
ActorRef toString = remote()
.actorFor("to-string", "localhost", 2553);
doubleIt.sendOneWay("meh", toString);
On Remote Server结果:
mehmeh
客户端管理的远程:
ActorRef remoteToString = remote().actorOf(ToStringActor.class, "localhost", 2553).start();
ActorRef localDoubleIt = Actors.actorOf(DoubleItActor.class).start();
remoteToString.sendOneWay("foo");
localDoubleIt.sendOneWay("bar", remoteToString);
On Remote Server输出:
foo
barbar
基于事件的分发
Akka的缺省分发器:Dispatchers.globalExecutorBasedEventDrivenDispatcher
每个线程可以有很多actors
Actors 不应该堵塞。
Akka如果需要能跨线程,有最大数限制。
(1)使用Priority event based dispatcher
val act = actorOf(new Actor {
def receive = { case x => println("Received : " + x)
}})
act.dispatcher =
new PriorityExecutorBasedEventDrivenDispatcher("foo", PriorityGenerator {
case _:String => 0
case x:Int => x
case _ => 50
})
act.start.dispatcher.suspend(act)
act ! 1.0
act ! "foo"
(0 to 9).map { x:Int => act ! (x * 10) }
act ! 2.0
act ! "bar"
输出结果:
Received : foo
Received : 0
Received : bar
Received : 10
Received : 20
Received : 30
Received : 40
Received : 1.0
Received : 50
Received : 2.0
Received : 60
Received : 70
Received : 80
Received : 90
(2)使用Work stealing event dispatcher
public class FooActor extends UntypedActor {
public static MessageDispatcher dispatcher = Dispatchers
.newExecutorBasedEventDrivenWorkStealingDispatcher("foobar", 5)
.build();
private static int currentId = 0;
private final int instanceId;
private int count = 0;
public FooActor() {
getContext().setDispatcher(dispatcher);
instanceId = currentId++;
}
接受端:
@Override
public void onReceive(Object message) {
System.out.printf(
"Foo %d processed : %s (count = %d) on Thread : %s\n",
InstanceId,
message.toString(),
++count,
Thread.currentThread().getName()
);
try { Thread.sleep(instanceId * 50 + 50); }
catch (InterruptedException ex) { }
}
输出结果:
Foo 0 processed : 0 (count = 1) on Thread : akka:event-driven:dispatcher:foobar-1
Foo 1 processed : 1 (count = 1) on Thread : akka:event-driven:dispatcher:foobar-2
Foo 0 processed : 2 (count = 2) on Thread : akka:event-driven:dispatcher:foobar-3
Foo 1 processed : 3 (count = 2) on Thread : akka:event-driven:dispatcher:foobar-4
Foo 0 processed : 4 (count = 3) on Thread : akka:event-driven:dispatcher:foobar-5
Foo 1 processed : 5 (count = 3) on Thread : akka:event-driven:dispatcher:foobar-6
Foo 0 processed : 6 (count = 4) on Thread : akka:event-driven:dispatcher:foobar-7
Foo 0 processed : 7 (count = 5) on Thread : akka:event-driven:dispatcher:foobar-8
Foo 1 processed : 8 (count = 4) on Thread : akka:event-driven:dispatcher:foobar-9
Foo 0 processed : 9 (count = 6) on Thread : akka:event-driven:dispatcher:foobar-10
Foo 1 processed : 93 (count = 40) on Thread : akka:event-driven:dispatcher:foobar-15
Foo 0 processed : 94 (count = 55) on Thread : akka:event-driven:dispatcher:foobar-13
Foo 0 processed : 95 (count = 56) on Thread : akka:event-driven:dispatcher:foobar-13
Foo 0 processed : 96 (count = 57) on Thread : akka:event-driven:dispatcher:foobar-16
Foo 1 processed : 97 (count = 41) on Thread : akka:event-driven:dispatcher:foobar-14
Foo 0 processed : 98 (count = 58) on Thread : akka:event-driven:dispatcher:foobar-4
Foo 1 processed : 99 (count = 42) on Thread : akka:event-driven:dispatcher:foobar-4
(3)使用Thread based dispatcher
getContext().setDispatcher( Dispatchers.newThreadBasedDispatcher(getContext()));.
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
差劲的扩展性和性能
适合做后端守护"daemon" actors
低频率消息
会堵塞Blocking
路由和负载平衡
InfiniteIterator<ActorRef> iter =
new CyclicIterator<ActorRef>(Arrays.asList(
actorOf(FooActor.class).start(),
actorOf(BarActor.class).start()
));
for (Integer value:Arrays.asList(4, 8, 15, 16, 23, 42)) {
iter.next().sendOneWay(value);
}
注意输出:
Bar: 8
Foo: 4
Bar: 16
Foo: 15
Bar: 42
Foo: 23
ActorRef router = actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new UntypedLoadBalancer() {
private InfiniteIterator iterator =
new CyclicIterator<ActorRef>(Arrays.asList(
actorOf(FooActor.class).start(),
actorOf(BarActor.class).start()));
public InfiniteIterator<ActorRef> seq() {
return iterator;
}
};
}}).start();
for (Integer value :
Arrays.asList(4, 8, 15, 16, 23, 42)) {
router.sendOneWay(value);
}
输出:
Foo received '3' count = 1
Foo received '4' count = 2
Foo received '6' count = 3
Foo received '8' count = 4
Foo received '9' count = 5
Foo received '10' count = 6
Foo received '11' count = 7
Foo received '12' count = 8
Foo received '13' count = 9
Foo received '14' count = 10
Foo received '15' count = 11
Bar received '1' count = 1
Bar received '2' count = 2
Bar received '5' count = 3
Bar received '7' count = 4
下页
Akka教程
单独写原则
Actor专题
AKKA专题
Reactive专题
EDA事件驱动
Scala入门之函数编程