在本教程中,我们将学习实现一个简单的 SpringBoot 应用程序,该应用程序实现了 CQRS 原理。
在这个用例中,我们将使用 Axon 4 框架。为了存储事件,我们将使用Mongo DB,而不是存储和查询数据的存储库,我们将使用静态 Map。
CQRS — 命令查询分离:简单来说,要么对数据执行操作,要么查询并返回数据。
Axon 是一个开源 Java 框架,用于开发事件驱动的微服务。Axon 带有 Axon 框架和 Axon 服务器。我们不会在本教程中使用 Axon 服务器,相反,我们将实现我们自己的事件存储。
持久层可以使用 Axon 提供的合适的 EventStorageEngine 实现,但是因为我们使用Mongo DB 时,我们将使用MongoEventStorageEngine.
 
设置
添加到我们的SpringBoot 项目(SpringBoot 2.5.0):
| <dependency><groupId>org.axonframework</groupId>
 <artifactId>axon-spring-boot-starter</artifactId>
 <version>${axon.version}</version>
 <exclusions>
 <exclusion>
 <groupId>org.axonframework</groupId>
 <artifactId>axon-server-connector</artifactId>
 </exclusion>
 <exclusion>
 <groupId>com.thoughtworks.xstream</groupId>
 <artifactId>xstream</artifactId>
 </exclusion>
 </exclusions>
 </dependency>
 <!-- https://mvnrepository.com/artifact/org.axonframework.extensions.mongo/axon-mongo -->
 <dependency>
 <groupId>org.axonframework.extensions.mongo</groupId>
 <artifactId>axon-mongo</artifactId>
 <version>4.2</version>
 </dependency>
 <!-- https://mvnrepository.com/artifact/com.thoughtworks.xstream/xstream -->
 <dependency>
 <groupId>com.thoughtworks.xstream</groupId>
 <artifactId>xstream</artifactId>
 <version>1.4.17</version>
 </dependency>
 
 | 
请注意,我们排除了默认的 xstream 和 axon-server-connector 依赖项。
axon-server-connector 被排除在外,因为我们依赖于 Axon Server。
与 axon 捆绑在一起的默认 XStream jar (1.4.10) 会导致以下错误:
“Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.util.Comparator java.util.TreeMap.comparator accessible: module java.base does not “opens java.util” to unnamed module @703580bf”
 
控制器
| 定义控制器:@Autowired
 private CommandGateway commandGateway;
 @PostMapping("/order")
 public ResponseEntity<Void> createOrder(@RequestBody Order order){
 CreateOrderCommand command = new CreateOrderCommand(order.getOrderId(), order.getOrderAmount());
 commandGateway.send(command);
 return ResponseEntity.ok().build();
 }
 
 CommandGateway将根据命令类型将命令发送到CommandHandler,在我们的示例中,命令类型为CreateOrderCommand。我们只能为一个命令定义一个命令处理程序。
 
 
 | 
命令处理程序
下面是命令处理程序:
| @Aggregatepublic class OrderAggregate {
 @AggregateIdentifier
 private String orderId;
 private Double orderAmount;
 public OrderAggregate() {}
 @CommandHandler
 public OrderAggregate(CreateOrderCommand command) {
 CreateOrderEvent event = new CreateOrderEvent(command.getOrderId(), command.getOrderAmount());
 AggregateLifecycle.apply(event);
 }
 @EventSourcingHandler
 public void on(CreateOrderEvent event) {
 this.orderId = event.getOrderId();
 this.orderAmount = event.getOrderAmount();
 }
 
 | 
将聚合视为包含状态的对象以及可以更改聚合状态的方法。聚合必须具有使用AggregateIdentifier注释的标识符。
一旦apply方法被调用,事件消息就会在aggregate的范围内发布。在我们的示例中,这意味着EventSourcingHandler 将调用由 注释的方法并更新订单域的状态。
一旦事件保存在事件存储中,事件总线将调用所有外部事件处理程序。
 
事件处理程序
| @Componentpublic class OrderQueryHandler {
 @Autowired
 private OrderRepository orderRepository;
 @EventHandler
 public void on(CreateOrderEvent event) {
 orderRepository.storeOrder(new Order(event.getOrderId(), null, event.getOrderAmount()));
 }
 }
 
 | 
EventHandler 用作实际持久化事件的投影。在我们的示例中,我们调用OrderRepository来存储订单详细信息,然后QueryHandlers(不包含在此应用程序中)将使用这些详细信息来获取数据。
 
配置定义
| @Bean public MongoClient mongoClient(){
 MongoClient mongoClient = new MongoClient("localhost", 27017);   return mongoClient;
 }
 @Bean
 public EventStorageEngine storageEngine(MongoClient client) {     return MongoEventStorageEngine.builder().mongoTemplate(DefaultMongoTemplate.builder().mongoDatabase(client).build()).build();
 }
 @Bean
 public EmbeddedEventStore eventStore(EventStorageEngine storageEngine, AxonConfiguration configuration) {
 return EmbeddedEventStore.builder()             .storageEngine(storageEngine)             .messageMonitor(configuration.messageMonitor(EventStore.class, "eventStore")).build();
 }
 
 | 
当我们定义我们自己的事件存储来持久化事件的状态时,我们需要定义EventStore bean 和MongoEventStorageEngine来使用Mongo作为事件存储。
结论
Axon 更像是实现 CQRS 的构建块,框架本身并不强制特定的实现方法。我们可以选择适合我们需求的解决方案。
可以通过GitHub访问此应用程序的完整代码。