Actors模型专题
AKKA框架
监管和容错Supervision and fault tolerance
"Let it Crash" 任由它崩溃(顺势而为)
通过Supervisor管理inked Actors
当linked Actor崩溃时通知
策略:OneForOne和
AllForOne
适合远程
Actorsval supervisor = Supervisor(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 1000, List(classOf[Exception])),
Supervise(
actorOf[MyActor],
LifeCycle(Permanent)) ::
Nil))
supervisor.link(actorOf[MyActor])
class MyActor extends Actor {
self.lifeCycle = Some(LifeCycle(Permanent))
override def preRestart(reason: Throwable) {
// clean things up
}
}
管理事务STM
软件事务内存Software Transactional Memory (STM)
ACID中
只有ACI没有D
Actors + STM
如果Actor's写操作失败,在内存中回滚重试
提供两种数据结构支持STM:
TransactionalMap
TransactionalVecto
val tranMap = TransactionalMap[String, Int]("Foo" -> 23)
val tranVector = TransactionalVector(2.0, 4.0, 6.0)
try {
atomic {
tranMap += ("Bar" -> 42)
tranVector.add(8.0)
println(tranMap)
println(tranVector)
1 / 0
}
} catch { case ex:Any => println(ex) }
atomic { println(tranMap); println(tranVector)
}
Map(Bar -> 42, Foo -> 23)
TransactionalVector(2.0, 4.0, 6.0, 8.0)
java.lang.ArithmeticException: / by zero
Map(Foo -> 23)
TransactionalVector(2.0, 4.0, 6.0)
协调事务Coordinated transactions
以转载为案例,一个账户减去金额 一个账户增加同等金额,保证整个过程事务性,要么全部成功,要么全部失败。
class CoordinatedAccountActor extends Actor {
private val balance:Ref[Int] = Ref(0)
def doReceive:Receive = {
case Withdraw(x) if x > balance.get =>
throw new Exception("Overdrawn")
case Withdraw(x) if x > 0 =>
balance.alter(_ - x)
case Deposit(x) if x > 0 =>
balance.alter(_ + x)
case PrintBalance =>
println("Balance = " + balance.get)
}
def receive = {
case coordinated @ Coordinated(action) => coordinated atomic {
doReceive(action)
}
case other:Any => doReceive(other)
}
}
客户端调用:
account1 ! Deposit(100)
account2 ! Deposit(50)
val coordinated = Coordinated()
try {
coordinated atomic {
account1 ! coordinated.coordinate(Deposit(75))
account2 ! coordinated.coordinate(Withdraw(75))
}
} catch {
case ex:Exception => println(ex.getMessage)
}
两个账户初始值是100和75,当转账75时 出错:
org.multiverse.templates.InvisibleCheckedException: java.lang.Exception: Overdrawn
account1 ! PrintBalance
Balance = 100
account2 ! PrintBalance
Balance = 50
再看Typed Actor案例:
public interface BankAccount {
@Coordinated public void withdraw(int amount);
@Coordinated public void deposit(int amount);
public int getBalance();
}
public class BankAccountTypedActor extends TypedActor
implements BankAccount {
private Ref<Integer> balance = new Ref<Integer>(0);
public void withdraw(int amount) {
if (amount > 0) {
if (amount > balance.get()) {
throw new RuntimeException("Overdrawn");
} else {
balance.set(balance.get() - amount);
}
}
}
public void deposit(int amount) {
if (amount > 0) { balance.set(balance.get() + amount); }
}
public int getBalance() {
return balance.get();
}
}
两种转账方法:原子性是安全的,另外一种非安全:
public static void transferUnsafe(BankAccount fromAccount, BankAccount toAccount,int amount)
{
toAccount.deposit(amount);
fromAccount.withdraw(amount);
}
public static void transferSafe(
final BankAccount fromAccount,
final BankAccount toAccount,
final int amount)
{
Coordination.coordinate(true, new Atomically() {
public void atomically() {
toAccount.deposit(amount);
fromAccount.withdraw(amount);
}
});
}
当转账150时运行代码和结果:
BankAccount account1 = (BankAccount) TypedActor
.newInstance(BankAccount.class, BankAccountTypedActor.class, 1000);
BankAccount account2 = (BankAccount) TypedActor
.newInstance(BankAccount.class, BankAccountTypedActor.class, 1000);
account1.deposit(100);
account2.deposit(50);
try {
transferSafe(account1, account2, 150);
} catch (Exception ex) {
System.err.println(ex.getMessage());
}
System.out.println(account1.getBalance());
100
System.out.println(account2.getBalance());
50
java.lang.RuntimeException: Overdrawn
因为原来账户只有100,无法转账150元,系统自动报错。
如果调用不安全的非原子方法:
transferUnsafe(account1, account2, 150);
输出结果:
System.out.println(account1.getBalance());
100
System.out.println(account2.getBalance());
200
java.lang.RuntimeException: Overdrawn
与Spring整合
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://akka.io/schema/akka"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://akka.io/schema/akka
http://akka.io/akka-1.1.xsd">
<akka:untyped-actor id="toStringActor"
implementation="tfd.akkatest.java.ToStringActor"
scope="singleton"
autostart="true"/>
</beans>
Spring支持:
Typed Actors
Remote Actors
Dispatchers
Supervisors
Camel endpoints
Akka教程
单独写原则
Actor专题
Go Reactive宣言
AKKA专题
Reactive专题
EDA事件驱动