基于spring来实现crqs的一些问题

初学CQRS架构,想在新项目中使用CQRS来进行架构,采用的是java来进行开发,架构采用传统的springMvc+Jpa(Hibernate实现)+Spring4.0+Mysql,暂时没有考虑EventSource的问题,因为事件溯源暂时还有些无法理解。
1、命令总线(CommandBus)实现:
命令总线使用一个Collection来保存所有的命令执行器(ICommandExecutor),在Spring启动的时候通过IOC进行加载注入,当命令进入的时候就去总线中通过反射查询相应的Executor并向后执行(比如调用Domain或DomainService),代码如下:
@[author]Component[/author]
public class DefaultCommandBus implements ICommandBus<ICommandExecutor<ICommand>>{
@[author]Autowired[/author]
private List<ICommandExecutor> executors;
public DefaultCommandBus() {}
//执行命令
public void send(ICommand command) {
ICommandExecutor<ICommand> executor = findCommandExecutor(command);
executor.execute(command);
}
//在所有的命令执行器集合中查找符合条件的执行器
private ICommandExecutor<ICommand> findCommandExecutor(ICommand command){
for (ICommandExecutor<ICommand> ce : executors) {
CommandListener ric = ce.getClass().getAnnotation(CommandListener.class);
Class<? extends ICommand> clazz = ric.commandClass();
if(clazz.getName().equals(command.getClass().getName())){
return ce;
}
}
return null;
}
}
2、事件聚合器(IEventAggragete)实现:(暂时没支持异步事件)
采用了和命令总线差不多的实现方法。
但由于事件和事件处理器是一对多的关系,所以这里采用了Map来实现,Key为指定事件的名称,Values为一个List<IEventHandler>,当Domain或DomainService使用聚合器Pulish一个消息的时候,就去聚合器中查找相应事件对应的一组事件处理器,并循环执行,代码如下:
@[author]Component[/author]
public class EventAggregate implements IEventAggregate {
//实现执行结果回调
private ICallBack callBack;
//注册所有的事件及对应的监听器组
private Map<String,List<IEventHandler<IEvent>>> handlers = new HashMap<String, List<IEventHandler<IEvent>>>();

@[author]Autowired[/author]
private List<IEventHandler> eventHandlers;

//自动订阅事件监听器
@PostConstruct
public void autoSubcribe() {
for (IEventHandler<IEvent> eventHandler : eventHandlers) {
EventListener eventAnno = eventHandler.getClass().getAnnotation(EventListener.class);
if(eventAnno != null){
String eventTypeName = eventAnno.eventClass().getName();//获取该handler感兴趣的事件类的名称
List<IEventHandler<IEvent>> ehs = findEventType(eventTypeName);//在聚合中查找是否有该事件的处理器已经注册
if(ehs != null){//如果存在
ehs.add(eventHandler);
handlers.put(eventTypeName, ehs);
}else{
ehs = new ArrayList<IEventHandler<IEvent>>();
ehs.add(eventHandler);
handlers.put(eventTypeName, ehs);
}
}
}
}
private List<IEventHandler<IEvent>> findEventType(String eventTypeName){
if(handlers.size() != 0){
return handlers.get(eventTypeName);
}
return null;
}
//订阅事件
public <TEvent extends IEvent> void subcribe(IEventHandler<IEvent> handler) {
}
//解约事件处理器
public <TEvent extends IEvent> void desubcribe(IEventHandler<IEvent> handler) {
}
//领域对象发布消息
public <TEvent extends IEvent> void publish(TEvent event) {
String eventTypeName = event.getClass().getName();
List<IEventHandler<IEvent>> ehs = findEventType(eventTypeName);
if(ehs != null){
for (IEventHandler<IEvent> iEventHandler : ehs) {
iEventHandler.handle(event);
if(callBack != null){
callBack.callBack();
}
}
}
}
public void setCallBack(ICallBack callBack) {
this.callBack = callBack;
}
}
支持回调,如果需要进行回调,则需要注入ICallback接口的实现类(一般是实现了ICallback接口的Domain或DominService对象)
以上两点就是CQRS的Command部分的核心实现了,但总觉得有些问题:
1、SpringMVC的前端控制器是DispatherServlet,其实也是servlet,所以必须基于容器来接受请求,当客户端请求进来之后,容器(比如Jetty)会从线程池中调取一个空闲的线程来为该客户端服务,这就保证了每个客户端都有一个指定的线程来处理,即不存在资源竞争的问题(Spring采用了大量的ThreadLocal来保证了线程安全),那是否还需要采用类似Disruptor这种框架来封装命令或事件的派发,因为Disruptor框架也是用来解决安全的发送消息问题的框架;
2、领域对象和领域服务会显示的和事件聚合器发生耦合。(这里如果采用Disruptor这类框架来封装消息的发送,比如封装为静态方法,这才会出现资源竞争的问题,但只有这样才能和Domain或DomainService解耦)
不知道大家对以上两点的看法是怎么样的?性能上的问题还未考虑,如果只是单线程没有资源竞争的情况下,性能应该没有太大问题(一个类似于Tomcat或Jetty的web服务器400左右的并发应该不错了,毕竟是Http的短连接)
[该贴被wilsonp于2014-03-16 20:40修改过]
[该贴被wilsonp于2014-03-16 20:42修改过]
[该贴被wilsonp于2014-03-16 21:51修改过]
[该贴被wilsonp于2014-03-16 21:52修改过]

非常赞赏这种探索,挺好的。

我注意到你提到:暂时没有支持异步事件,其实从事件这个词语来看,其隐式已经包含了异步的概念,因为事件的发生是随机的,事件的响应不一定同步处理,比如你正在做一件事,另外一个事件发生了,接到一个电话,你知道某个事件发生,但你可能还是要继续做你之前做的事情,不一定立即中断手上的活去响应处理电话的事件。

从实现机制来看,同步实现其实就是一个线程做所有的事情,万一其中一个环节堵塞或处理慢了,会影响后续的处理,类似串联电路。而异步一般是多个线程可能是并发处理所有事情,这样利用CPU多核同时工作,效率高,避免了“单点风险”,某个环节或点出现堵塞或处理时间过长,不会影响后续处理。

那么多个线程同时工作后,线程之间要传递数据怎么办呢?就像四百米接力棒比赛,你要和把数据棒传给下一个啊,Disruptor就起到这个作用。

更多并发编程:http://www.jdon.com/concurrency.html

异步编程:http://www.jdon.com/asynchronous.html

但似乎使用spring之类的框架之后,架构底层已经被spring封装好了,不太需要多线程的事情,就像我在帖子中提到的,每个客户端连接进来,servlet容器会自动的分发一个线程去处理请求,那么关于这个请求的所有操作都会在这个线程中执行。
如果考虑到当前这个线程中某些逻辑有阻塞情况下,就可以使用多线程(在当前线程之外单开一个或多个线程)来仅进行处理,然后当前线程继续往下执行,并利用disruptor来进行线程之间的数据传递。
不知道我的理解对不对?

2014-03-17 10:13 "@wilsonp"的内容
考虑到当前这个线程中某些逻辑有阻塞情况下,就可以使用多线程 ...

这是因为Spring缺省是同步机制,那我们只能有意识的主动去使用异步,但是这种主动必须你能够知道哪些地方阻塞,而只有等待编程完成以后测试时才会发现,再去重构比较麻烦。

而使用事件驱动的框架如NodeJS等,它们缺省是异步机制,到处异步,这样能预防在先。

以腐败为例子,Spring和JavaEE等现有同步机制是等腐败发生后去纠正,而事件驱动编程能够做到预先预防腐败,当然带来缺点是编程思路不同。

参考:http://www.jdon.com/46189