异步架构思维:使用Akka实现领域建模

去年我们在Jdon.com狠狠地讨论了异步架构,这个异步不是一般意义上的异步,不那种无需实时返回结果的异步,而是与是否需要返回结果无关的异步,应该属于一种并发策略,是在多核时代的并行计算和分布式计算思维的体现。

今天,Thinking Asynchronous - Domain Modeling using Akka Transactors - Part 1一文介绍了使用基于Scala语言的Akka框架如何实现异步思维,异步思维已经渗透到语言中,成为一种天然的编程习惯,颠覆了顺序编程的传统。

Akka框架之前介绍过,是和DCI架构结合的,我们已经发现软件不同领域的发展最终都已经汇聚集合到一起,这本身也是一种Map/reduce哦。

闲话少说,让我们看看这篇文章是如何写的:

Evans DDD提出仓储repository概念,将对象存储持久和对象本身进行了分离,再加上NoSQL发展,文章引入了key-value存储数据库Redis作为repository的实现。

假设有一个Account对象,scala代码如下:


case class Account(no: String,
name: String,
dateOfOpening: Date,
dateOfClose: Option[Date],
balance: Float)

有几个字段no 号码,name名称等等,设想场景:打开一个新的Opening a New Account, 然后查询这个账户的余额, 在这个帐号加入一笔金额数字。

实现这个功能,一般是通过scala的消息机制来实现,实现命令如下:


sealed trait AccountEvent
case class Open(from: String, no: String, name: String) extends AccountEvent
case class New(account: Account) extends AccountEvent
case class Balance(from: String, no: String) extends AccountEvent
case class Post(from: String, no: String, amount: Float) extends AccountEvent

其中trait是类似Qi4j中实现DCI架构的场景混合器,这里是为事件AccountEvent,Open为打开账户事件;New是新账户,Balance是查询余额事件,Post是加入新纪录事件。

这些事件被客户端发出,由Domain service领域服务拦截,最终发往仓储Repsitory,Akka transactor可以提供基于事件的容错性,如果事件发生冲突崩溃如何进行处理等都可以由代码指定。


trait AccountRepository extends Actor

class RedisAccountRepository extends AccountRepository {
//如果事件冲突崩溃,再来一次
lifeCycle = Some(LifeCycle(Permanent))
val STORAGE_ID =
"account.storage"

// actual persistent store Account实例集合 同时使用NoSQL数据库持久化
private var accounts = atomic { RedisStorage.getMap(STORAGE_ID) }

def receive = {
//接受New事件
case New(a) =>
atomic {
//新加载账户,key为no,Value(BLOB)是Account对象的序列化数组。
accounts.+=(a.no.getBytes, toByteArray[Account](a))
}
//接受查询余额事件处理
case Balance(from, no) =>
val b = atomic { accounts.get(no.getBytes) }
b match {
case None => reply(None)
case Some(a) =>
val acc = fromByteArray[Account](a).asInstanceOf[Account]
reply(Some(acc.balance))
}

//.. other message handlers
}
//崩溃失败重新如何处理?
override def postRestart(reason: Throwable) = {
accounts = RedisStorage.getMap(STORAGE_ID)
}
}

从以上看出:
1。Akka是一个基于let-it-crash 任其崩溃的方针. 通过LifeCycle(Permanent)定义崩溃后处理方式,Permanent是重新启动 Temporary则是不启动而直接关闭。

2.重新启动的Hook钩子方法你可以直接定义,如postRestart 就是重新启动加载accounts集合。

3.Akka使用使用一种基于Java的事务机制 multiverse来实现事务,使用atomic {}就可以。

异步消息能够实现端对端的解耦,不会堵塞,更易于管理,典型基于领域模型的的actor实现是轻量的,可以有数百万个actors,它们之间通过消息来协作交互,非常符合对象职责和交互的领域模型

说句题外话,jdon推出的开源框架Jdonframework通过domain events来实现领域模型与仓储repository等技术架构实现交互,虽然比不上scala+Akka这么彻底,但在设计理念上很相近,尤其还是使用Java平台。见Domain Events异步应用, JF的PPT中也有更多使用异步消息实现数据持久加载说明:http://www.jdon.com/jdonframework/

剑桥两位博士开发了一个支持JAVA语言中使用Coroutine的kilim框架,此框架更加方便java人员使用Coroutine。kilim里面有几个关键的概念:
1 Task
Task我们可以理解为Actor,用法非常类似于JAVA语言中的Thread,只需要继承Task,override execute方法即可。

2 Mailbox
kilim利用Mailbox来发送消息和接受消息。
mailbox.put(message);//同步发送
mailbox.putnb(message);//异步发送
mailbox.putbmessage);//阻塞发送

相应的也有对象的接受消息的方法:
mailbox.get(); // 同步接收,传入一个long参数表示等待的超时时间

mailbox.getnb() //异步接受,立刻返回

mailbox.getb()//阻塞接受


http://kilim.malhar.net/
[该贴被xmuzyu于2010-03-22 23:08修改过]

2010年03月22日 17:57 "banq"的内容
这些事件被客户端发出,由Domain service领域服务拦截,最终发往仓储Repsitory ...

这个架构可以见讨论:这样基于Domain Event的分层是否合理?,通过事件来消除和服务层的耦合,再结合一个场景混合器,也就是DCI架构,就是把服务层和实体一起注射到当前场景对象中,扮演场景角色完成功能。

这个场景混合器应该类似Akka transactor实现的trait AccountRepository extends Actor,类似使用qi4j实现DCI架构的AssignmentsMixin。

也就是说,框架应该提供一个Context或Actor,这个Actor实际也是一个MessageListerner,事件监听者或者观察者,在这个Actor中,需要注射实体的数据,注射需要的服务或仓储。

以上面Akka这段代码为例子,如下:


def receive = {
//接受New事件
case New(a) =>

//接受查询余额事件处理
case Balance(from, no) =>

//.. other message handlers
}

这段异步消息的处理程序和Jdonframework中messageListener很类似:


public interface MessageListener
{
void action(DomainMessage paramDomainMessage);
}

这个接口很符合SOLID原则的单一职责,也与EJB中JMS的响应类MDB接口类似。

上面def receive代码中New和Balance方法其实就可以用MessageListener接口的两个实现NewMessageListener和BalanceMessageListener实现。


public class BalanceMessageListener implements MessageListener{

//接受查询余额事件处理
public action(DomainMessage paramDomainMessage){

}
}

与Akka的区别就是,Akka将这些方法混合在一个所谓场景类中,而JF则是自然分散的,运行时混合在一起,不过我们看不见这种场景对象,我个人认为这种方式更优雅啊,当然有孩子都是自己好的嫌疑。

再看看使用qi4j实现DCI架构中,使用Qi4j的class AssignmentsMixin:


AssignmentsMixin
implements Assignments
{
@This //自动注入Data
AssignmentsData data;

//选择其中一个分配给分配者
public void assignTo( Assignable assignable, Assignee assignee )
{

}

//查询可分配者的分配Assignments列表
public Iterable<Assignable> assignments()
{
}
}

其中交互行为assignTo方法也可以使用MessageListener接口实现,而且也不需要明显的Mixin类。

暂时比较考虑到这里:Jdonframework从一个新的角度来考虑DCI架构实现,角度决定架构,这种从domain events角度好像比较优雅实现DCI中场景,无需编程时做一个场景类或者一个所谓混合器Mixin或Trait,也许随着思考深入会发现Mixin和Trait存在的必要呢。

补充:刚巧在Getting started with ZK CDI中看到类似事件注入做法,注意这里是CDI,是JavaEE6的一个标准 Contexts and Dependency Inject

这个文章在控制器中写了场景混合,一般控制器或SOA的服务都是场景发生地,如下:


@Named
@SessionScoped
public class HelloWorld extends GenericComposer implements Serializable {

@Inject @ComponentId("guestName") Textbox guestName;
@Inject @ComponentId(
"sayHelloBtn") Button sayHelloBtn;
@Inject @ComponentId(
"helloWindow") Window helloWindow;

public void sayHello(@Observes @Events(
"sayHelloBtn.onClick") MouseEvent evt) {
helloWindow.setTitle(
"Hello " + guestName.getValue());
}
}

这个类名叫GenericComposer ,和Qi4j的面向组合Composer名称类似,意思在这里发生场景组合了,sayHello中来自事件源:按钮点击,说明sayHello也是一种事件监听者或者观察者。通过依赖注入IOC/DI和事件模式结合实现场景混合,这个思路和Jdonframework的MessageListener比较类似。考虑角度很像。但是还是有区别的:CDI是什么?


[该贴被banq于2010-03-24 14:04修改过]

scala的例子,哈哈,我输出一个链结:

http://www.scala-liftweb.com