基于spring来实现crqs的一些问题

14-03-16 wilsonp
初学CQRS架构,想在新项目中使用CQRS来进行架构,采用的是java来进行开发,架构采用传统的springMvc+Jpa(Hibernate实现)+Spring4.0+Mysql,暂时没有考虑EventSource的问题,因为事件溯源暂时还有些无法理解。

1、命令总线(CommandBus)实现:

命令总线使用一个Collection来保存所有的命令执行器(ICommandExecutor),在Spring启动的时候通过IOC进行加载注入,当命令进入的时候就去总线中通过反射查询相应的Executor并向后执行(比如调用Domain或DomainService),代码如下:

@[author]Component[/author]

public class DefaultCommandBus implements ICommandBus<ICommandExecutor<ICommand>>{

@[author]Autowired[/author]

private List<ICommandExecutor> executors;

public DefaultCommandBus() {}

//执行命令

public void send(ICommand command) {

ICommandExecutor<ICommand> executor = findCommandExecutor(command);

executor.execute(command);

}

//在所有的命令执行器集合中查找符合条件的执行器

private ICommandExecutor<ICommand> findCommandExecutor(ICommand command){

for (ICommandExecutor<ICommand> ce : executors) {

CommandListener ric = ce.getClass().getAnnotation(CommandListener.class);

Class<? extends ICommand> clazz = ric.commandClass();

if(clazz.getName().equals(command.getClass().getName())){

return ce;

}

}

return null;

}

}

2、事件聚合器(IEventAggragete)实现:(暂时没支持异步事件)

采用了和命令总线差不多的实现方法。

但由于事件和事件处理器是一对多的关系,所以这里采用了Map来实现,Key为指定事件的名称,Values为一个List<IEventHandler>,当Domain或DomainService使用聚合器Pulish一个消息的时候,就去聚合器中查找相应事件对应的一组事件处理器,并循环执行,代码如下:

@[author]Component[/author]

public class EventAggregate implements IEventAggregate {

//实现执行结果回调

private ICallBack callBack;

//注册所有的事件及对应的监听器组

private Map<String,List<IEventHandler<IEvent>>> handlers = new HashMap<String, List<IEventHandler<IEvent>>>();

@[author]Autowired[/author]

private List<IEventHandler> eventHandlers;

//自动订阅事件监听器

@PostConstruct

public void autoSubcribe() {

for (IEventHandler<IEvent> eventHandler : eventHandlers) {

EventListener eventAnno = eventHandler.getClass().getAnnotation(EventListener.class);

if(eventAnno != null){

String eventTypeName = eventAnno.eventClass().getName();//获取该handler感兴趣的事件类的名称

List<IEventHandler<IEvent>> ehs = findEventType(eventTypeName);//在聚合中查找是否有该事件的处理器已经注册

if(ehs != null){//如果存在

ehs.add(eventHandler);

handlers.put(eventTypeName, ehs);

}else{

ehs = new ArrayList<IEventHandler<IEvent>>();

ehs.add(eventHandler);

handlers.put(eventTypeName, ehs);

}

}

}

}

private List<IEventHandler<IEvent>> findEventType(String eventTypeName){

if(handlers.size() != 0){

return handlers.get(eventTypeName);

}

return null;

}

//订阅事件

public <TEvent extends IEvent> void subcribe(IEventHandler<IEvent> handler) {

}

//解约事件处理器

public <TEvent extends IEvent> void desubcribe(IEventHandler<IEvent> handler) {

}

//领域对象发布消息

public <TEvent extends IEvent> void publish(TEvent event) {

String eventTypeName = event.getClass().getName();

List<IEventHandler<IEvent>> ehs = findEventType(eventTypeName);

if(ehs != null){

for (IEventHandler<IEvent> iEventHandler : ehs) {

iEventHandler.handle(event);

if(callBack != null){

callBack.callBack();

}

}

}

}

public void setCallBack(ICallBack callBack) {

this.callBack = callBack;

}

}

支持回调,如果需要进行回调,则需要注入ICallback接口的实现类(一般是实现了ICallback接口的Domain或DominService对象)

以上两点就是CQRS的Command部分的核心实现了,但总觉得有些问题:

1、SpringMVC的前端控制器是DispatherServlet,其实也是servlet,所以必须基于容器来接受请求,当客户端请求进来之后,容器(比如Jetty)会从线程池中调取一个空闲的线程来为该客户端服务,这就保证了每个客户端都有一个指定的线程来处理,即不存在资源竞争的问题(Spring采用了大量的ThreadLocal来保证了线程安全),那是否还需要采用类似Disruptor这种框架来封装命令或事件的派发,因为Disruptor框架也是用来解决安全的发送消息问题的框架;

2、领域对象和领域服务会显示的和事件聚合器发生耦合。(这里如果采用Disruptor这类框架来封装消息的发送,比如封装为静态方法,这才会出现资源竞争的问题,但只有这样才能和Domain或DomainService解耦)

不知道大家对以上两点的看法是怎么样的?性能上的问题还未考虑,如果只是单线程没有资源竞争的情况下,性能应该没有太大问题(一个类似于Tomcat或Jetty的web服务器400左右的并发应该不错了,毕竟是Http的短连接)

[该贴被wilsonp于2014-03-16 20:40修改过]

[该贴被wilsonp于2014-03-16 20:42修改过]

[该贴被wilsonp于2014-03-16 21:51修改过]

[该贴被wilsonp于2014-03-16 21:52修改过]

1
banq
2014-03-17 07:47
非常赞赏这种探索,挺好的。

我注意到你提到:暂时没有支持异步事件,其实从事件这个词语来看,其隐式已经包含了异步的概念,因为事件的发生是随机的,事件的响应不一定同步处理,比如你正在做一件事,另外一个事件发生了,接到一个电话,你知道某个事件发生,但你可能还是要继续做你之前做的事情,不一定立即中断手上的活去响应处理电话的事件。

从实现机制来看,同步实现其实就是一个线程做所有的事情,万一其中一个环节堵塞或处理慢了,会影响后续的处理,类似串联电路。而异步一般是多个线程可能是并发处理所有事情,这样利用CPU多核同时工作,效率高,避免了“单点风险”,某个环节或点出现堵塞或处理时间过长,不会影响后续处理。

那么多个线程同时工作后,线程之间要传递数据怎么办呢?就像四百米接力棒比赛,你要和把数据棒传给下一个啊,Disruptor就起到这个作用。

更多并发编程:http://www.jdon.com/concurrency.html

异步编程:http://www.jdon.com/asynchronous.html

wilsonp
2014-03-17 10:13
但似乎使用spring之类的框架之后,架构底层已经被spring封装好了,不太需要多线程的事情,就像我在帖子中提到的,每个客户端连接进来,servlet容器会自动的分发一个线程去处理请求,那么关于这个请求的所有操作都会在这个线程中执行。

如果考虑到当前这个线程中某些逻辑有阻塞情况下,就可以使用多线程(在当前线程之外单开一个或多个线程)来仅进行处理,然后当前线程继续往下执行,并利用disruptor来进行线程之间的数据传递。

不知道我的理解对不对?

banq
2014-03-17 10:53
2014-03-17 10:13 "@wilsonp"的内容
考虑到当前这个线程中某些逻辑有阻塞情况下,就可以使用多线程 ...

这是因为Spring缺省是同步机制,那我们只能有意识的主动去使用异步,但是这种主动必须你能够知道哪些地方阻塞,而只有等待编程完成以后测试时才会发现,再去重构比较麻烦。

而使用事件驱动的框架如NodeJS等,它们缺省是异步机制,到处异步,这样能预防在先。

以腐败为例子,Spring和JavaEE等现有同步机制是等腐败发生后去纠正,而事件驱动编程能够做到预先预防腐败,当然带来缺点是编程思路不同。

参考:http://www.jdon.com/46189

猜你喜欢