基于spring来实现crqs的一些问题
初学CQRS架构,想在新项目中使用CQRS来进行架构,采用的是java来进行开发,架构采用传统的springMvc+Jpa(Hibernate实现)+Spring4.0+Mysql,暂时没有考虑EventSource的问题,因为事件溯源暂时还有些无法理解。
1、命令总线(CommandBus)实现:
命令总线使用一个Collection来保存所有的命令执行器(ICommandExecutor),在Spring启动的时候通过IOC进行加载注入,当命令进入的时候就去总线中通过反射查询相应的Executor并向后执行(比如调用Domain或DomainService),代码如下:
@[author]Component[/author]
public class DefaultCommandBus implements ICommandBus<ICommandExecutor<ICommand>>{
@[author]Autowired[/author]
private List<ICommandExecutor> executors;
public DefaultCommandBus() {}
//执行命令
public void send(ICommand command) {
ICommandExecutor<ICommand> executor = findCommandExecutor(command);
executor.execute(command);
}
//在所有的命令执行器集合中查找符合条件的执行器
private ICommandExecutor<ICommand> findCommandExecutor(ICommand command){
for (ICommandExecutor<ICommand> ce : executors) {
CommandListener ric = ce.getClass().getAnnotation(CommandListener.class);
Class<? extends ICommand> clazz = ric.commandClass();
if(clazz.getName().equals(command.getClass().getName())){
return ce;
}
}
return null;
}
}
2、事件聚合器(IEventAggragete)实现:(暂时没支持异步事件)
采用了和命令总线差不多的实现方法。
但由于事件和事件处理器是一对多的关系,所以这里采用了Map来实现,Key为指定事件的名称,Values为一个List<IEventHandler>,当Domain或DomainService使用聚合器Pulish一个消息的时候,就去聚合器中查找相应事件对应的一组事件处理器,并循环执行,代码如下:
@[author]Component[/author]
public class EventAggregate implements IEventAggregate {
//实现执行结果回调
private ICallBack callBack;
//注册所有的事件及对应的监听器组
private Map<String,List<IEventHandler<IEvent>>> handlers = new HashMap<String, List<IEventHandler<IEvent>>>();
@[author]Autowired[/author]
private List<IEventHandler> eventHandlers;
//自动订阅事件监听器
@PostConstruct
public void autoSubcribe() {
for (IEventHandler<IEvent> eventHandler : eventHandlers) {
EventListener eventAnno = eventHandler.getClass().getAnnotation(EventListener.class);
if(eventAnno != null){
String eventTypeName = eventAnno.eventClass().getName();//获取该handler感兴趣的事件类的名称
List<IEventHandler<IEvent>> ehs = findEventType(eventTypeName);//在聚合中查找是否有该事件的处理器已经注册
if(ehs != null){//如果存在
ehs.add(eventHandler);
handlers.put(eventTypeName, ehs);
}else{
ehs = new ArrayList<IEventHandler<IEvent>>();
ehs.add(eventHandler);
handlers.put(eventTypeName, ehs);
}
}
}
}
private List<IEventHandler<IEvent>> findEventType(String eventTypeName){
if(handlers.size() != 0){
return handlers.get(eventTypeName);
}
return null;
}
//订阅事件
public <TEvent extends IEvent> void subcribe(IEventHandler<IEvent> handler) {
}
//解约事件处理器
public <TEvent extends IEvent> void desubcribe(IEventHandler<IEvent> handler) {
}
//领域对象发布消息
public <TEvent extends IEvent> void publish(TEvent event) {
String eventTypeName = event.getClass().getName();
List<IEventHandler<IEvent>> ehs = findEventType(eventTypeName);
if(ehs != null){
for (IEventHandler<IEvent> iEventHandler : ehs) {
iEventHandler.handle(event);
if(callBack != null){
callBack.callBack();
}
}
}
}
public void setCallBack(ICallBack callBack) {
this.callBack = callBack;
}
}
支持回调,如果需要进行回调,则需要注入ICallback接口的实现类(一般是实现了ICallback接口的Domain或DominService对象)
以上两点就是CQRS的Command部分的核心实现了,但总觉得有些问题:
1、SpringMVC的前端控制器是DispatherServlet,其实也是servlet,所以必须基于容器来接受请求,当客户端请求进来之后,容器(比如Jetty)会从线程池中调取一个空闲的线程来为该客户端服务,这就保证了每个客户端都有一个指定的线程来处理,即不存在资源竞争的问题(Spring采用了大量的ThreadLocal来保证了线程安全),那是否还需要采用类似Disruptor这种框架来封装命令或事件的派发,因为Disruptor框架也是用来解决安全的发送消息问题的框架;
2、领域对象和领域服务会显示的和事件聚合器发生耦合。(这里如果采用Disruptor这类框架来封装消息的发送,比如封装为静态方法,这才会出现资源竞争的问题,但只有这样才能和Domain或DomainService解耦)
不知道大家对以上两点的看法是怎么样的?性能上的问题还未考虑,如果只是单线程没有资源竞争的情况下,性能应该没有太大问题(一个类似于Tomcat或Jetty的web服务器400左右的并发应该不错了,毕竟是Http的短连接)
[该贴被wilsonp于2014-03-16 20:40修改过]
[该贴被wilsonp于2014-03-16 20:42修改过]
[该贴被wilsonp于2014-03-16 21:51修改过]
[该贴被wilsonp于2014-03-16 21:52修改过]