Actors模型专题

AKKA框架

 上页

监管和容错Supervision and fault tolerance

"Let it Crash" 任由它崩溃(顺势而为)
通过Supervisor管理inked Actors
当linked Actor崩溃时通知
策略:OneForOne和 AllForOne
适合远程

Actorsval supervisor = Supervisor(
    SupervisorConfig(
        RestartStrategy(OneForOne, 3, 1000, List(classOf[Exception])),
        Supervise(
            actorOf[MyActor],
            LifeCycle(Permanent)) ::
        Nil))


supervisor.link(actorOf[MyActor])


class MyActor extends Actor {
    self.lifeCycle = Some(LifeCycle(Permanent))
    override def preRestart(reason: Throwable) {
        // clean things up
    }
}

管理事务STM

软件事务内存Software Transactional Memory (STM)
ACID中 只有ACI没有D
Actors + STM
如果Actor's写操作失败,在内存中回滚重试
提供两种数据结构支持STM:
TransactionalMap
TransactionalVecto

val tranMap = TransactionalMap[String, Int]("Foo" -> 23)
val tranVector = TransactionalVector(2.0, 4.0, 6.0)
    try {
       atomic {
        tranMap += ("Bar" -> 42)
        tranVector.add(8.0)

        println(tranMap)
        println(tranVector)
        1 / 0
    }
} catch { case ex:Any => println(ex) }
    atomic { println(tranMap); println(tranVector)

}


Map(Bar -> 42, Foo -> 23)
TransactionalVector(2.0, 4.0, 6.0, 8.0)
java.lang.ArithmeticException: / by zero
Map(Foo -> 23)
TransactionalVector(2.0, 4.0, 6.0)

 

协调事务Coordinated transactions

以转载为案例,一个账户减去金额 一个账户增加同等金额,保证整个过程事务性,要么全部成功,要么全部失败。

class CoordinatedAccountActor extends Actor {
    private val balance:Ref[Int] = Ref(0)

    def doReceive:Receive = {
        case Withdraw(x) if x > balance.get =>
            throw new Exception("Overdrawn")
        case Withdraw(x) if x > 0 =>
            balance.alter(_ - x)
        case Deposit(x) if x > 0 =>
            balance.alter(_ + x)
        case PrintBalance =>
            println("Balance = " + balance.get)
}

def receive = {
    case coordinated @ Coordinated(action) => coordinated atomic {
        doReceive(action)
    }
    case other:Any => doReceive(other)
}
}

客户端调用:

account1 ! Deposit(100)
account2 ! Deposit(50)

val coordinated = Coordinated()

try {
coordinated atomic {
    account1 ! coordinated.coordinate(Deposit(75))
    account2 ! coordinated.coordinate(Withdraw(75))
}
} catch {
    case ex:Exception => println(ex.getMessage)
}

两个账户初始值是100和75,当转账75时 出错:
org.multiverse.templates.InvisibleCheckedException: java.lang.Exception: Overdrawn

account1 ! PrintBalance
Balance = 100
account2 ! PrintBalance
Balance = 50

再看Typed Actor案例:

public interface BankAccount {
    @Coordinated public void withdraw(int amount);
    @Coordinated public void deposit(int amount);
    public int getBalance();
}

public class BankAccountTypedActor extends TypedActor
implements BankAccount {
    private Ref<Integer> balance = new Ref<Integer>(0);

    public void withdraw(int amount) {
        if (amount > 0) {
        if (amount > balance.get()) {
        throw new RuntimeException("Overdrawn");
        } else {
        balance.set(balance.get() - amount);
        }
    }
}

public void deposit(int amount) {
    if (amount > 0) { balance.set(balance.get() + amount); }
}

public int getBalance() {
    return balance.get();
}
}

两种转账方法:原子性是安全的,另外一种非安全:

public static void transferUnsafe(BankAccount fromAccount, BankAccount toAccount,int amount)
{
    toAccount.deposit(amount);
    fromAccount.withdraw(amount);
}

public static void transferSafe( final BankAccount fromAccount, final BankAccount toAccount, final int amount)
{
    Coordination.coordinate(true, new Atomically() {
        public void atomically() {
            toAccount.deposit(amount);
            fromAccount.withdraw(amount);
        }
});
}

当转账150时运行代码和结果:

BankAccount account1 = (BankAccount) TypedActor .newInstance(BankAccount.class, BankAccountTypedActor.class, 1000);
BankAccount account2 = (BankAccount) TypedActor .newInstance(BankAccount.class, BankAccountTypedActor.class, 1000);

account1.deposit(100);
account2.deposit(50);

try {
    transferSafe(account1, account2, 150);
} catch (Exception ex) {
    System.err.println(ex.getMessage());
}

System.out.println(account1.getBalance());
100
System.out.println(account2.getBalance());
50
java.lang.RuntimeException: Overdrawn

因为原来账户只有100,无法转账150元,系统自动报错。

如果调用不安全的非原子方法:

transferUnsafe(account1, account2, 150);

输出结果:

System.out.println(account1.getBalance());
100
System.out.println(account2.getBalance());
200
java.lang.RuntimeException: Overdrawn

 

与Spring整合

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:akka="http://akka.io/schema/akka"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://akka.io/schema/akka
    http://akka.io/akka-1.1.xsd">

<akka:untyped-actor id="toStringActor"
    implementation="tfd.akkatest.java.ToStringActor"
    scope="singleton"
    autostart="true"/>
</beans>

Spring支持:

Typed Actors
Remote Actors
Dispatchers
Supervisors
Camel endpoints

 

Akka教程

单独写原则

Akka的软件事务STM

Actor专题

Go Reactive宣言

AKKA专题

Reactive专题

EDA事件驱动