Actors模型专题

Akka框架

  AKKA框架是一个平台,灵感来自ERlang,能更轻松地开发可扩展,实现多线程安全应用。虽然在大多数流行的语言并发是基于多线程之间的共享内存,使用同步方法防止写争夺,Akka提供的并发模型基于Actors。

什么是Actor?

Actors提供一个简单统一模型:
隔离计算实体
"Share nothing"
没有任何地方同步
异步消息传递
不可变的消息
Actors有类似消息的mailbox / queue

Actor来自于哪里?

首先是由Carl Hewitt在1973定义
由Erlang OTP (Open Telecom Platform) 推广
消息传递更加符合面向对象的原始意图

Alan Kay说:

"OOP 对于我来说仅仅意味着消息, 保护隐藏在内部进行状态处理,然后在后期和外界其他事物进行绑定。"

为什么使用Actor?

没有使用Actor,你的CPU占用如下:

使用Actor会降低CPU占用:

 

Akka的作者

Jonas Bonér

经历:

AspectWerkz
JRockit
Terracotta
Typesafe 公司创始人,与Martin Oderksy (May 2011)一起

 

为什么使用Akka?

垂直扩展Scale up

Akka actors是轻量,能够平均在一个系统创建数千个。
线程是重量的,场景切换相当慢。

横向扩展Scale out

Actor能够在代码没有任何修改时在远程运行
失败恢复和错误处理

 

Actor的Java代码

import akka.actor.UntypedActor;

public class ToStringActor extends UntypedActor {
  @Override
  public void onReceive(Object message) {
    System.out.println(message.toString());
  }
}

使用这个Actor发送消息:

ActorRef toString = Actors.actorOf(ToStringActor.class).start();

toString.sendOneWay(42);
42

toString.sendOneWay(3.14159);
3.14159

toString.sendOneWay(true);
true

接受消息:

public class DoubleItActor extends UntypedActor {
  @Override
  public void onReceive(Object message) {
    getContext().replyUnsafe(
        message.toString() +
        message.toString());
  }
}

Or:
getContext().replySafe(

发送且等答复:

ActorRef doubleIt = Actors.actorOf(DoubleItActor.class).start();

System.out.println( doubleIt.sendRequestReply("foo"));
foofoo

System.out.println( doubleIt.sendRequestReplyFuture("bar") .get());
barbar
System.out.println( doubleIt.sendRequestReplyFuture("bar") .await().result());
Some(barbar)

如果在Java8 引入Monad将简单:

System.out.println(
  doubleIt.sendRequestReplyFuture("meh")
    .map(new Function<String,Integer>() {
        public Integer apply(String param) {
          return param.length();
    }
  }).await().result());
Some(6)

单向回复:

ActorRef toString = actorOf(ToStringActor.class) .start();

doubleIt.sendOneWay("foobar", toString);
foobarfoobar

匿名Actor:

ActorRef act = actorOf(new UntypedActorFactory() {
  public UntypedActor create() {
    return new UntypedActor() {
      public void onReceive(Object message) {
        System.out.println("Received : "
        + message);
    }
  };
}
}).start();

 

Scala的Actor代码

import akka.actor.{Actor}

class ToStringActor extends Actor {
  override def receive = {
    case message:Any =>
        println(message.toString)
    }
}

发送消息:

val toString = Actor.actorOf(classOf[ToStringActor]).start

toString ! 42 // same as toString.!(42)
42

ToString ! 3.14159
3.14159

toString ! true
true

接受消息:

class DoubleItActor extends Actor {
  override def receive = {
    case message:Any =>
      self.reply(
        message.toString + message.toString)
    }
}

Or:
self.reply_?

发送且等答复:

val doubleIt = Actors.actorOf(classOf[DoubleItActor]).start

println(doubleIt !! "foo")
Some(foofoo)

println((doubleIt !!! "bar").get)
barbar

println((doubleIt !!! "bar").await.result)
Some(barbar)

println((doubleIt !!! "meh") .map((x:String) => x.length).await.result)
Some(6)

单向回复:

val toString = Actors.actorOf(classOf[ToStringActor]).start

(double ! sendOneWay)(toString)
foobarfoobar

匿名Actor:

val fooActor = Actor.actorOf(
    new Actor {
        def receive = {
            case x => println("Foo: " + x)
    }
}).start

 

Clojure的Actor代码:

(ns tfd.clojurefun.ClojureActor
  (:gen-class
    :extends akka.actor.UntypedActor
  )
)

(defn -onReceive [this msg]
  (println (.toString msg))
)

 

Typed actors

public interface Counter {
  public void increment();
  public Integer getValue();
  public Future<Integer> getFutureValue();
}

public class CounterActor extends TypedActor
implements Counter {
  private Integer value = 0;

  public void increment() {
    try { Thread.sleep(1000); }
    catch (InterruptedException ex) { }
      value++;
  }

  public Integer getValue() {
    return value;
  }

  public Future<Integer> getFutureValue() {
    return future(value);
  }
}

调用运行:

Counter counter = TypedActor.newInstance(Counter.class, CounterActor.class, 2000);
counter.increment();

System.out.println(counter.getValue());
1

counter.increment();

System.out.println( counter.getFutureValue().get());
2

 

远程Actor

两种方式:下面是服务端管理的远程:

On Remote Server:
RemoteServerModule server = remote().start("localhost", 2553);

server.register("double-it", actorOf(DoubleItActor.class));
server.register("to-string", actorOf(ToStringActor.class));

On Client:
ActorRef doubleIt = remote() .actorFor("double-it", "localhost", 2553);
ActorRef toString = remote() .actorFor("to-string", "localhost", 2553);

doubleIt.sendOneWay("meh", toString);

On Remote Server结果:
mehmeh

客户端管理的远程:

ActorRef remoteToString = remote().actorOf(ToStringActor.class, "localhost", 2553).start();

ActorRef localDoubleIt = Actors.actorOf(DoubleItActor.class).start();

remoteToString.sendOneWay("foo");

localDoubleIt.sendOneWay("bar", remoteToString);

On Remote Server输出:
foo
barbar

 

基于事件的分发

Akka的缺省分发器:Dispatchers.globalExecutorBasedEventDrivenDispatcher
每个线程可以有很多actors
Actors 不应该堵塞。
Akka如果需要能跨线程,有最大数限制。

(1)使用Priority event based dispatcher

val act = actorOf(new Actor {
  def receive = { case x => println("Received : " + x)
}})

act.dispatcher = new PriorityExecutorBasedEventDrivenDispatcher("foo", PriorityGenerator {
  case _:String => 0
  case x:Int => x
  case _ => 50
})

act.start.dispatcher.suspend(act)

act ! 1.0
act ! "foo"
(0 to 9).map { x:Int => act ! (x * 10) }
act ! 2.0
act ! "bar"

输出结果:

Received : foo
Received : 0
Received : bar
Received : 10
Received : 20
Received : 30
Received : 40
Received : 1.0
Received : 50
Received : 2.0
Received : 60
Received : 70
Received : 80
Received : 90

(2)使用Work stealing event dispatcher

public class FooActor extends UntypedActor {
  public static MessageDispatcher dispatcher = Dispatchers
    .newExecutorBasedEventDrivenWorkStealingDispatcher("foobar", 5)
    .build();

  private static int currentId = 0;

  private final int instanceId;

  private int count = 0;

  public FooActor() {
    getContext().setDispatcher(dispatcher);
    instanceId = currentId++;
}

接受端:

@Override
public void onReceive(Object message) {
    System.out.printf(
        "Foo %d processed : %s (count = %d) on Thread : %s\n",
        InstanceId,
        message.toString(),
        ++count,
        Thread.currentThread().getName()
    );
    try { Thread.sleep(instanceId * 50 + 50); }
    catch (InterruptedException ex) { }
}

输出结果:

Foo 0 processed : 0 (count = 1) on Thread : akka:event-driven:dispatcher:foobar-1
Foo 1 processed : 1 (count = 1) on Thread : akka:event-driven:dispatcher:foobar-2
Foo 0 processed : 2 (count = 2) on Thread : akka:event-driven:dispatcher:foobar-3
Foo 1 processed : 3 (count = 2) on Thread : akka:event-driven:dispatcher:foobar-4
Foo 0 processed : 4 (count = 3) on Thread : akka:event-driven:dispatcher:foobar-5
Foo 1 processed : 5 (count = 3) on Thread : akka:event-driven:dispatcher:foobar-6
Foo 0 processed : 6 (count = 4) on Thread : akka:event-driven:dispatcher:foobar-7
Foo 0 processed : 7 (count = 5) on Thread : akka:event-driven:dispatcher:foobar-8
Foo 1 processed : 8 (count = 4) on Thread : akka:event-driven:dispatcher:foobar-9
Foo 0 processed : 9 (count = 6) on Thread : akka:event-driven:dispatcher:foobar-10

Foo 1 processed : 93 (count = 40) on Thread : akka:event-driven:dispatcher:foobar-15
Foo 0 processed : 94 (count = 55) on Thread : akka:event-driven:dispatcher:foobar-13
Foo 0 processed : 95 (count = 56) on Thread : akka:event-driven:dispatcher:foobar-13
Foo 0 processed : 96 (count = 57) on Thread : akka:event-driven:dispatcher:foobar-16
Foo 1 processed : 97 (count = 41) on Thread : akka:event-driven:dispatcher:foobar-14
Foo 0 processed : 98 (count = 58) on Thread : akka:event-driven:dispatcher:foobar-4
Foo 1 processed : 99 (count = 42) on Thread : akka:event-driven:dispatcher:foobar-4

(3)使用Thread based dispatcher

getContext().setDispatcher( Dispatchers.newThreadBasedDispatcher(getContext()));.

self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)

差劲的扩展性和性能
适合做后端守护"daemon" actors
低频率消息
会堵塞Blocking

 

路由和负载平衡

InfiniteIterator<ActorRef> iter = new CyclicIterator<ActorRef>(Arrays.asList(
    actorOf(FooActor.class).start(),
    actorOf(BarActor.class).start()
));

for (Integer value:Arrays.asList(4, 8, 15, 16, 23, 42)) {
    iter.next().sendOneWay(value);
}

注意输出:

Bar: 8
Foo: 4
Bar: 16
Foo: 15
Bar: 42
Foo: 23

 

ActorRef router = actorOf(new UntypedActorFactory() {
    public UntypedActor create() {
    return new UntypedLoadBalancer() {
        private InfiniteIterator iterator =
        new CyclicIterator<ActorRef>(Arrays.asList(
            actorOf(FooActor.class).start(),
            actorOf(BarActor.class).start()));

        public InfiniteIterator<ActorRef> seq() {
            return iterator;
        }
};
}}).start();

for (Integer value :
    Arrays.asList(4, 8, 15, 16, 23, 42)) {
        router.sendOneWay(value);
}

输出:

Foo received '3' count = 1
Foo received '4' count = 2
Foo received '6' count = 3
Foo received '8' count = 4
Foo received '9' count = 5
Foo received '10' count = 6
Foo received '11' count = 7
Foo received '12' count = 8
Foo received '13' count = 9
Foo received '14' count = 10
Foo received '15' count = 11
Bar received '1' count = 1
Bar received '2' count = 2
Bar received '5' count = 3
Bar received '7' count = 4

 

下页

Akka教程

单独写原则

Actor专题

Go Reactive宣言

AKKA专题

Reactive专题

EDA事件驱动

Scala入门之函数编程