Jdon框架CQRS入门

  适合JdonFramework 6.6.8以后版本。阅读本入门需要有DDD领域驱动设计CQRS知识。

  在Jdon框架中有两个模型: Component(组件) 和 Model(领域模型). 分别以@Component, 和 @Model 标注。

  当一个Model被外部组件访问,它一般是DDD中的聚合根实体,因为根据DDD只有聚合根实体才能被外界访问引用,外部不能直接访问聚合边界内其它对象,必须通过聚合根,这样聚合根才能保证聚合边界内各个对象变化的一致性。

  如果一个Model被其他领域模型引用,它就肯定不是聚合根,因为聚合根之间不能直接相互引用,它可能是聚合内一个对象,或者是实体或者是值对象。

  领域模型Model实例生活在in-memory内存缓存中, 而组件Component实例的生命周期是应用级别,比如和Web容器相同,一个容器内缺省是一个单例。

  Component能够用来实现DDD的服务service或其他应用管理器,如邮件发送等。

  Jdon框架也提供一种类似Component的Service类型 (标注为@Service),它是面向外部客户端,而不是面向内部,可用来实现SOA的粗粒度大服务。

  Jdon在这两个模型(Component和Model)之间提供四种异步并发的通讯方式,也是一种Producer/Consumer模式。

  1.组件和模型 Component -----> model

  2. 模型和组件 model ------->Component

  3. 组件与组件 Compponent ------> Component

  4. 模型与模型 model------> model

  当一个组件或服务Component/Service发送消息给领域模型Model(也就是聚合根aggregate root), 在CQRS中我们称这个消息携带的是命令command, 当一个领域模型model发送消息给组件Component, 我们称它为事件,代表已经在领域模型中发生什么事情。

  一个命令激活聚合根模型的行为,如上面的startMatch方法,然后在这个方法执行时,一个事件也发生了,这个事件可激活其他聚合根或组件协同工作。比如让Repository组件保存模型自身等。

actors model

  下面谈谈这四种通讯方式如何使用Jdon实现:

1. 组件与模型

  Component(producer with @Component) -- > Model(consumer with @Model)

  在这个方式下,其实代表CQRS的Command, 一个命令可能来自UI或其他聚合根的事件,将发往聚合根实体,一个命令激活聚合根实体的一个方法行为。

  这种方式下生产者和消费者producer:consumer只能是1:1, 一个命令只能发往一个聚合根实体模型,由这个聚合根模型根据业务规则检查命令是否有效,是否可以执行等等。

  下面是一个生产者为组件的代码,使用 @Component:标注。组件要求有接口和实现两个类。

 

package com.jdon.sample.test.command;

import com.jdon.annotation.model.Send;

public interface AICommand {

  @Send("CommandmaTest")
  public TestCommand ma(@Receiver BModel bModel);

}

@Component("producer")
@Introduce("componentmessage")
public class A implements AICommand {

 public TestCommand ma(BModel bModel) {
  System.out.print("send to BModel =" + bModel.getId());
  return new TestCommand(99);
 }
}

  方法名为 "ma"是用元注解 @Send,对于发送一个命令,还必须在该方法的输入参数中指定该命令发往哪个具体的聚合根实体实例,所以要使用 @Receiver.来指定,这里是BModel实例。


@Model
public class BModel {
  private String id;

  private int state = 100;

  public BModel(String id) {
    super();
    this.id = id;
  }

  @OnCommand("CommandmaTest")
  public void save(TestCommand testCommand) {
    this.state = testCommand.getInput() + state;
    testCommand.setOutput(state);

  }

  public String getId() {
    return id;
  }

  public void setId(String id) {
    this.id = id;
  }

}

  这里命令的生产者和消费者之间是通过主题队列Queue名为"CommandmaTest"进行联系,这个名词全局必须唯一,代表两者之间是1:1,而不是1:N的关系。OnCommand代表消费者consumer响应的方法。

  整个调用过程是:生产者AICommand实例的ma方法命令消费者BModel的save方法立即执行,生产者AICommand代表一个组件Component,而消费者BModel是一个Model。

  客户端代码:

AppUtil appUtil = new AppUtil();
AICommand a = (AICommand) appUtil.getComponentInstance("producerforCommand");
BModel bModel = new BModel("one");
TestCommand testCommand = a.ma(bModel);
int i = 0;
long start = System.currentTimeMillis();
while (testCommand.getOutput() != 199) {
i++;
}
long stop = System.currentTimeMillis();
Assert.assertEquals(testCommand.getOutput(), 199);
System.out.print("ok " + " " + (stop - start));

  输出结果:

  send to BModel =oneok 5

  以上完整可运行源码见github

  这个模式下,组件component承接来自UI或其他事件转化为的命令,也可以称为command handler,如下:

  UI --->commandHandler(@Component) ---->聚合根 aggregate root

  聚合根的方法被命令激活执行,执行是单线程单写(单写原则),基于每秒处理600万订单的Ringbuffer,类似Node.js事件驱动,因此某个时刻只有一个线程修改模型内部状态,避免了使用锁或两阶段事务等低吞吐量低效能方式。相关讨论可见:SOA并不能解决高并发事务

@Model
public class AggregateRootA {

    private int state = 100;

    @Inject   //event Observable(Producer)
    private DomainEventProduceIF domainEventProducer;

    @OnCommand("CommandtoEventA")  //command Observers(Consumer)
    public Object save(ParameterVO parameterVO) {

        //以单线程更新聚合根内部状态
        this.state = parameterVO.getValue() + state;

        //一个reactive事件产生
        return domainEventProducer.sendtoAnotherAggragate(aggregateRootBId, this.state);

    }

}

 

2.模型和组件

   Model(producer with @Model) -- > Component(consumer with @Component)

  当一个聚合根实体模型接受到命令然后执行以后,它会在方法执行过程中激活reactive一个事件,这个事件称为领域事件domain events,将被发往其他组件或其他聚合根实体模型。

  为了实现聚合根模型作为事件的生产者,我们可以将一个组件Component(with @Component) 注入到模型中,这样模型就作为生产者。

  聚合根模型代码:

@Model
public class MyModel {

  private Long id;
  private String name;

  @Inject //inject the Component into this domain model
  private MyModelDomainEvent myModelDomainEvent;

  public String getName() {
    if (this.name == null) {
      DomainMessage message = myModelDomainEvent.asyncFindName(this);
      this.name = (String) message.getBlockEventResult();
    }
    return name;
  }

....

}

  我们使用了@Inject,将MyModelDomainEvent实例注入到"MyModel"中,而"MyModelDomainEvent" 是一个组件,在其中我们完成事件生产者的发送方法::

package com.jdon.sample.test.domain.simplecase;

import com.jdon.annotation.Introduce;
import com.jdon.annotation.model.Send;
import com.jdon.domain.message.DomainMessage;

@Introduce("message")
public class MyModelDomainEvent {

  @Send("MyModel.findName")
  public DomainMessage asyncFindName(MyModel myModel) {
    return new DomainMessage(myModel);
  }

  @Send("saveMyModel")
  public DomainMessage save(MyModel myModel) {
    return new DomainMessage(myModel);
  }

}

  MyModelDomainEvent必须标注为 @Introduce("message"), 表示引入一个拦截器叫message,这是在Jdon框架aspect.xml中事先定义的。在这个事件的生产者类中有两个主题topic, 注意这里是主题,而不是队列Queue,表示每个主题topic可以实现producer:consumer为1:N的事件发送。

  下面看看对于领域事件的响应器也就是消费者的代码,消费者也是一个组件:

@Consumer("MyModel.findName")
public class FindNameListener implements DomainEventHandler {

  public void onEvent(EventDisruptor event, boolean endOfBatch) throws Exception {
    MyModel myModel = (MyModel) event.getDomainMessage().getEventSource();
    System.out.println("Asynchronous eventMessage=" + myModel.getId());
    event.getDomainMessage().setEventResult("Asynchronous eventMessage=" + myModel.getId());
  }
}

  FindNameListener是使用新的元注解@Consumer, 注意不是 @Component, 表示这是一个消费者组件,使用@Consumer, 这个类就必须继承实现接口DomainEventHandler, 然后在其方法onEvent中完成对某个topic的生产者响应。

  如果使用@Component, 就必须使用@OnEvent标注你.自己的对某个topic响应的方法。这是两种不同的消费者写法,一个topic可以有多个消费者,执行顺序是按照类的包名完整字符串排列。

  下面是后一种消费额写法,实现生产者@Send("saveMyModel")的消费:

@Component("mymrepository")
@Introduce("modelCache")
public class RepositoryImp implements MyModelRepository {

  @Around
  public MyModel getModel(Long key) {
    MyModel mym = new MyModel();
    mym.setId(key);
    return mym;
  }

  @OnEvent("saveMyModel")
  public void save(MyModel myModel) {
    System.out.print("\n No.1 @OnEvent:" + this.getClass().getName());

  }

}

  这个消费者是RepositoryImp , 它是DDD的仓储Repository 实现,主要负责从仓库或者数据库还原一个完整的聚合根实体对象。

  @Introduce("modelCache") 和 @Around配合使用,将聚合根模型能保存在内存缓存中,以后凡是调用此方法,总是先检查缓存是否已经存在这个模型。这两个元注释是使用Jdon必须的,否则命令和事件都无法正常运行。当然,可以手工调用com.jdon.domain.dci.RoleAssigner 的assignAggregateRoot 方法将任何一个对象扮演成一个聚合根实体。

  至此,我们已经知道了组件和模型之间两种通讯方式,以上两种结合起来如下调用流程。

  UI ------>commandHandler(@Component) ---->聚合根

  聚合根 ------>EventHandler(@Component) ----->仓储持久数据库等

  在这两种方式集合情况下,聚合根实体模型其实扮演的是类似AKKA或Erlang中的Actors模型,同样具备以下特性:

  • Share NOTHING, 没有分享
  • 隔离的事件驱动处理
  • 输入或输出通讯都是异步 无堵塞的消息

  完整源码见github

 

3.组件与组件

   Component(producer with @Component) -- > Component(consumer with @Component)

  这个模式是组件和组件之间调用方式,分两种:

  1.依赖注入同步调用

  2.事件异步调用

  自动依赖注入是Jdon框架早期的一个功能,适合@Service和@Model之间的实例自动注入,如下:

 

@Component
public class A{

  private B b;

  //通过构造器将B实例注入
  public A(B b){
    this.b = b;
  }
}

@Component
public class B{

}

  当组件B被注入到组件A以后,在A中就可以直接同步调用B的方法,还有一种更加松耦合的方法,A不再依赖B,就是通过异步并发事件实现。

  如下A类作为发布者:

 

package com.jdon.sample.test.event;

import com.jdon.annotation.model.Send;

public interface AI {

  @Send("maTest")
    public TestEvent ma();
  }

 

@Component("producer")
@Introduce("componentmessage")
public class A implements AI {

  public TestEvent ma() {
    System.out.print("event.send.ma..");
    return new TestEvent(99);
  }
}

  B是事件的消费者,它的具体消费方法需要以@OnEvent标注。

 

 

@Component("consumer")
public class B {

  @OnEvent("maTest")
  public void mb(TestEvent testEvent) throws Exception {
    testEvent.setResult(testEvent.getS() + 1);//the consumer return a result to the procuder
    System.out.print("event.@OnEvent.mb.." + testEvent.getResult() + "\n");
    Assert.assertEquals(testEvent.getResult(), 100);
  }
}

  方法 "maTest" 标注了 @OnEvent ,其topic名词需要和@Send.相同,注意,生产者的方法返回类型是 "TestEvent",必须等同于消费者的方法输入参数,这样实现两者之间共享一个消息对象,这个消息对象可以携带任何两者需要传递分享的值对象。

  如果你希望消费者也返回一个结果给生产者,那么使用com.jdon.domain.message.DomainMessage, 其中有两个方法getBlockEventResult() or getEventResult(), getBlockEventResult() 是堵塞10秒等待消费者返回结果。

  测试代码如下:

 

 

AppUtil appUtil = new AppUtil();
AI a = (AI) appUtil.getService("producer");
TestEvent te = a.ma();
long start = System.currentTimeMillis();
while (te.getResult() != 100) {
}
long stop = System.currentTimeMillis();

Assert.assertEquals(te.getResult(), 100);
System.out.print("ok " + " " + (stop - start) + "\n");

  输出output:

[junit] event.send.ma..event.@OnEvent.mb..100
[junit] ok 31

  以上代码见Github

 

4.模型与模型

  Model(aggregate root A) -- > Model(aggregate root B)

  根据Evans的定义,聚合根内部维持一致性,但是聚合根之间不可以直接引用,可以实现最终一致性(CAP定理),

  当一个聚合根实体模型需要协调另外一个聚合根时,只能通过领域事件,发出事件作为下一个聚合根的输入命令。

  这个模式实际可以分解为由前面三个模式组合起来:

  1. 聚合根A激活一个事件到组件 (model ---> Component)

  2.组件将事件转为聚合根B的命令。 (Component ---->Component)

  3.命令发往聚合根B (component -->model)

  完整代码见Github:

 

EventSourcing流程

并发修改采取单写原则,见:
http://www.jdon.com/performance/singlewriter.html

了解以上背景知识后,流程应该是这样:

  1. command进来需要修改聚合根某个方法
  2. 这个聚合根不在系统内存中,需要从仓储中获取。放到缓存中,保持最经常使用的在内存中。
    不能使用REDIS这样的需要通过Socket的服务型缓存,而是要和程序在一个JVM中的缓存,必须也是Java编写。如果是Redis外部缓存,处理方式和数据库处理一样的。
  3. 执行方法,比如更改状态等,这时只有一个线程更改状态,是单写原则,类似Node.js

           @Model
           public class AggregateRootA {

                  private int state = 100;
          
                  @Inject   //event Observable(Producer)
                  private DomainEventProduceIF domainEventProducer;
          
                  @OnCommand("CommandtoEventA")  //command Observers(Consumer)
                  public Object save(ParameterVO parameterVO) {
                 
                         //以单线程方式更新状态 (Single Writer)
                         this.state = parameterVO.getValue() + state;
                 
                        //a reactive event will be send to other consumers in domainEventProducer
                       return domainEventProducer.
                                           sendtoAnotherAggragate(aggregateRootBId,this.state);

                  }
                 
           }


  4. 状态更改后产生领域事件。 domainEventProducer.sendtoAnotherAggragate
  5. 领域事件发到disruptor中
  6. 另外线程从disruptor从读取事件。
  7. 这个线程读取事件后按类名字母排名顺序执行相应的消费者代码
  8. 在消费者consumer中,如果是CQRS架构保存事件到Eventstore,如果是SOA架构就保存当前聚合根状态到数据库,执行其他业务逻辑。

 

相关参考:

领域事件和EventSourcing

EventStore

Actor模型

并发编程

猜你喜欢