异步架构思维:使用Akka实现领域建模

10-03-22 banq
去年我们在Jdon.com狠狠地讨论了异步架构,这个异步不是一般意义上的异步,不那种无需实时返回结果的异步,而是与是否需要返回结果无关的异步,应该属于一种并发策略,是在多核时代的并行计算和分布式计算思维的体现。

今天,Thinking Asynchronous - Domain Modeling using Akka Transactors - Part 1一文介绍了使用基于Scala语言的Akka框架如何实现异步思维,异步思维已经渗透到语言中,成为一种天然的编程习惯,颠覆了顺序编程的传统。

Akka框架之前介绍过,是和DCI架构结合的,我们已经发现软件不同领域的发展最终都已经汇聚集合到一起,这本身也是一种Map/reduce哦。

闲话少说,让我们看看这篇文章是如何写的:

Evans DDD提出仓储repository概念,将对象存储持久和对象本身进行了分离,再加上NoSQL发展,文章引入了key-value存储数据库Redis作为repository的实现。

假设有一个Account对象,scala代码如下:

case class Account(no: String, 
  name: String, 
  dateOfOpening: Date, 
  dateOfClose: Option[Date],
  balance: Float)
<p>

有几个字段no 号码,name名称等等,设想场景:打开一个新的Opening a New Account, 然后查询这个账户的余额, 在这个帐号加入一笔金额数字。

实现这个功能,一般是通过scala的消息机制来实现,实现命令如下:

sealed trait AccountEvent
case class Open(from: String, no: String, name: String) extends AccountEvent
case class New(account: Account) extends AccountEvent
case class Balance(from: String, no: String) extends AccountEvent
case class Post(from: String, no: String, amount: Float) extends AccountEvent
<p>

其中trait是类似Qi4j中实现DCI架构的场景混合器,这里是为事件AccountEvent,Open为打开账户事件;New是新账户,Balance是查询余额事件,Post是加入新纪录事件。

这些事件被客户端发出,由Domain service领域服务拦截,最终发往仓储Repsitory,Akka transactor可以提供基于事件的容错性,如果事件发生冲突崩溃如何进行处理等都可以由代码指定。

trait AccountRepository extends Actor

class RedisAccountRepository extends AccountRepository {
  //如果事件冲突崩溃,再来一次
  lifeCycle = Some(LifeCycle(Permanent))    
  val STORAGE_ID = "account.storage"

  // actual persistent store Account实例集合 同时使用NoSQL数据库持久化
  private var accounts = atomic { RedisStorage.getMap(STORAGE_ID) }

  def receive = {
    //接受New事件
    case New(a) => 
      atomic {
        //新加载账户,key为no,Value(BLOB)是Account对象的序列化数组。
        accounts.+=(a.no.getBytes, toByteArray[Account](a))  
      }
     //接受查询余额事件处理
    case Balance(from, no) =>
      val b = atomic { accounts.get(no.getBytes) }
      b match {
        case None => reply(None)
        case Some(a) => 
          val acc = fromByteArray[Account](a).asInstanceOf[Account]
          reply(Some(acc.balance))
      }

      //.. other message handlers
  }
  //崩溃失败重新如何处理?
  override def postRestart(reason: Throwable) = {
    accounts = RedisStorage.getMap(STORAGE_ID)  
  }
}
<p>

从以上看出:

1。Akka是一个基于let-it-crash 任其崩溃的方针. 通过LifeCycle(Permanent)定义崩溃后处理方式,Permanent是重新启动 Temporary则是不启动而直接关闭。

2.重新启动的Hook钩子方法你可以直接定义,如postRestart 就是重新启动加载accounts集合。

3.Akka使用使用一种基于Java的事务机制 multiverse来实现事务,使用atomic {}就可以。

异步消息能够实现端对端的解耦,不会堵塞,更易于管理,典型基于领域模型的的actor实现是轻量的,可以有数百万个actors,它们之间通过消息来协作交互,非常符合对象职责和交互的领域模型

说句题外话,jdon推出的开源框架Jdonframework通过domain events来实现领域模型与仓储repository等技术架构实现交互,虽然比不上scala+Akka这么彻底,但在设计理念上很相近,尤其还是使用Java平台。见Domain Events异步应用, JF的PPT中也有更多使用异步消息实现数据持久加载说明:http://www.jdon.com/jdonframework/

                   

4
xmuzyu
2010-03-22 23:06
剑桥两位博士开发了一个支持JAVA语言中使用Coroutine的kilim框架,此框架更加方便java人员使用Coroutine。kilim里面有几个关键的概念:

1 Task

Task我们可以理解为Actor,用法非常类似于JAVA语言中的Thread,只需要继承Task,override execute方法即可。

2 Mailbox

kilim利用Mailbox来发送消息和接受消息。

mailbox.put(message);//同步发送

mailbox.putnb(message);//异步发送

mailbox.putbmessage);//阻塞发送

相应的也有对象的接受消息的方法:

mailbox.get(); // 同步接收,传入一个long参数表示等待的超时时间

mailbox.getnb() //异步接受,立刻返回

mailbox.getb()//阻塞接受

http://kilim.malhar.net/

[该贴被xmuzyu于2010-03-22 23:08修改过]

banq
2010-03-23 18:05
2010年03月22日 17:57 "banq"的内容
这些事件被客户端发出,由Domain service领域服务拦截,最终发往仓储Repsitory ...

这个架构可以见讨论:这样基于Domain Event的分层是否合理?,通过事件来消除和服务层的耦合,再结合一个场景混合器,也就是DCI架构,就是把服务层和实体一起注射到当前场景对象中,扮演场景角色完成功能。

这个场景混合器应该类似Akka transactor实现的trait AccountRepository extends Actor,类似使用qi4j实现DCI架构的AssignmentsMixin。

也就是说,框架应该提供一个Context或Actor,这个Actor实际也是一个MessageListerner,事件监听者或者观察者,在这个Actor中,需要注射实体的数据,注射需要的服务或仓储。

以上面Akka这段代码为例子,如下:

def receive = {
    //接受New事件
    case New(a) => 
    
     //接受查询余额事件处理
    case Balance(from, no) =>
    
      //.. other message handlers
  }
<p>

这段异步消息的处理程序和Jdonframework中messageListener很类似:

public interface MessageListener
{
   void action(DomainMessage paramDomainMessage);
}
<p>

这个接口很符合SOLID原则的单一职责,也与EJB中JMS的响应类MDB接口类似。

上面def receive代码中New和Balance方法其实就可以用MessageListener接口的两个实现NewMessageListener和BalanceMessageListener实现。

public class BalanceMessageListener implements MessageListener{

   //接受查询余额事件处理
    public action(DomainMessage paramDomainMessage){

    } 
}
<p>

与Akka的区别就是,Akka将这些方法混合在一个所谓场景类中,而JF则是自然分散的,运行时混合在一起,不过我们看不见这种场景对象,我个人认为这种方式更优雅啊,当然有孩子都是自己好的嫌疑。

再看看使用qi4j实现DCI架构中,使用Qi4j的class AssignmentsMixin:

AssignmentsMixin
     implements Assignments
 {
     @This   //自动注入Data
     AssignmentsData data;

     //选择其中一个分配给分配者
     public void assignTo( Assignable assignable, Assignee assignee )
     {
      
     }

     //查询可分配者的分配Assignments列表
     public Iterable<Assignable> assignments()
     {
     }
 }
<p>

其中交互行为assignTo方法也可以使用MessageListener接口实现,而且也不需要明显的Mixin类。

暂时比较考虑到这里:Jdonframework从一个新的角度来考虑DCI架构实现,角度决定架构,这种从domain events角度好像比较优雅实现DCI中场景,无需编程时做一个场景类或者一个所谓混合器Mixin或Trait,也许随着思考深入会发现Mixin和Trait存在的必要呢。

补充:刚巧在Getting started with ZK CDI中看到类似事件注入做法,注意这里是CDI,是JavaEE6的一个标准 Contexts and Dependency Inject

这个文章在控制器中写了场景混合,一般控制器或SOA的服务都是场景发生地,如下:

@Named
@SessionScoped
public class HelloWorld extends GenericComposer implements Serializable {
 
	@Inject @ComponentId("guestName") Textbox guestName;
	@Inject @ComponentId("sayHelloBtn") Button sayHelloBtn;
	@Inject @ComponentId("helloWindow") Window helloWindow;
 
	public void sayHello(@Observes @Events("sayHelloBtn.onClick") MouseEvent evt) {
		helloWindow.setTitle("Hello " + guestName.getValue());
	}
}
<p>

这个类名叫GenericComposer ,和Qi4j的面向组合Composer名称类似,意思在这里发生场景组合了,sayHello中来自事件源:按钮点击,说明sayHello也是一种事件监听者或者观察者。通过依赖注入IOC/DI和事件模式结合实现场景混合,这个思路和Jdonframework的MessageListener比较类似。考虑角度很像。但是还是有区别的:CDI是什么?

[该贴被banq于2010-03-24 14:04修改过]

atester
2010-09-09 16:12
scala的例子,哈哈,我输出一个链结:

http://www.scala-liftweb.com

猜你喜欢