Axon 4结合SpringBoot和Mongo DB微服务的简单实现 - javarevisited


在本教程中,我们将学习实现一个简单的 SpringBoot 应用程序,该应用程序实现了 CQRS 原理。
在这个用例中,我们将使用 Axon 4 框架。为了存储事件,我们将使用Mongo DB,而不是存储和查询数据的存储库,我们将使用静态 Map。
CQRS — 命令查询分离:简单来说,要么对数据执行操作,要么查询并返回数据。
Axon 是一个开源 Java 框架,用于开发事件驱动的微服务。Axon 带有 Axon 框架和 Axon 服务器。我们不会在本教程中使用 Axon 服务器,相反,我们将实现我们自己的事件存储。
持久层可以使用 Axon 提供的合适的 EventStorageEngine 实现,但是因为我们使用Mongo DB 时,我们将使用MongoEventStorageEngine.
 
设置
添加到我们的SpringBoot 项目(SpringBoot 2.5.0):

<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>${axon.version}</version>
<exclusions>
 <exclusion>
  <groupId>org.axonframework</groupId>
  <artifactId>axon-server-connector</artifactId>
 </exclusion>
 <exclusion>
  <groupId>com.thoughtworks.xstream</groupId>
  <artifactId>xstream</artifactId>
 </exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.axonframework.extensions.mongo/axon-mongo -->
<dependency>
<groupId>org.axonframework.extensions.mongo</groupId>
<artifactId>axon-mongo</artifactId>
<version>4.2</version>
</dependency>
<!-- https:
//mvnrepository.com/artifact/com.thoughtworks.xstream/xstream -->
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.4.17</version>
</dependency>

请注意,我们排除了默认的 xstream 和 axon-server-connector 依赖项。
axon-server-connector 被排除在外,因为我们依赖于 Axon Server。
与 axon 捆绑在一起的默认 XStream jar (1.4.10) 会导致以下错误:
“Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.util.Comparator java.util.TreeMap.comparator accessible: module java.base does not “opens java.util” to unnamed module @703580bf”
 
控制器
定义控制器:
@Autowired
private CommandGateway commandGateway;
@PostMapping("/order")
public ResponseEntity<Void> createOrder(@RequestBody Order order){
CreateOrderCommand command = new CreateOrderCommand(order.getOrderId(), order.getOrderAmount());
commandGateway.send(command);
return ResponseEntity.ok().build();
}

CommandGateway将根据命令类型将命令发送到CommandHandler,在我们的示例中,命令类型为CreateOrderCommand。我们只能为一个命令定义一个命令处理程序。
 

命令处理程序
下面是命令处理程序:
@Aggregate
public class OrderAggregate {
@AggregateIdentifier
private String orderId;
private Double orderAmount;
public OrderAggregate() {}
@CommandHandler
public OrderAggregate(CreateOrderCommand command) {
CreateOrderEvent event = new CreateOrderEvent(command.getOrderId(), command.getOrderAmount());
AggregateLifecycle.apply(event);
}
@EventSourcingHandler
public void on(CreateOrderEvent event) {
this.orderId = event.getOrderId();
this.orderAmount = event.getOrderAmount();
}

将聚合视为包含状态的对象以及可以更改聚合状态的方法。聚合必须具有使用AggregateIdentifier注释的标识符。
一旦apply方法被调用,事件消息就会在aggregate的范围内发布。在我们的示例中,这意味着EventSourcingHandler 将调用由 注释的方法并更新订单域的状态。
一旦事件保存在事件存储中,事件总线将调用所有外部事件处理程序。
 
事件处理程序
@Component
public class OrderQueryHandler {
@Autowired
private OrderRepository orderRepository;
@EventHandler
public void on(CreateOrderEvent event) {
orderRepository.storeOrder(new Order(event.getOrderId(), null, event.getOrderAmount()));
}
}

EventHandler 用作实际持久化事件的投影。在我们的示例中,我们调用OrderRepository来存储订单详细信息,然后QueryHandlers(不包含在此应用程序中)将使用这些详细信息来获取数据。
 
配置定义
@Bean 
public MongoClient mongoClient(){   
MongoClient mongoClient = new MongoClient("localhost", 27017);   return mongoClient;  
}  
@Bean 
public EventStorageEngine storageEngine(MongoClient client) {     return MongoEventStorageEngine.builder().mongoTemplate(DefaultMongoTemplate.builder().mongoDatabase(client).build()).build(); 
}  
@Bean 
public EmbeddedEventStore eventStore(EventStorageEngine storageEngine, AxonConfiguration configuration) {     
return EmbeddedEventStore.builder()             .storageEngine(storageEngine)             .messageMonitor(configuration.messageMonitor(EventStore.class,
"eventStore")).build(); 
}

当我们定义我们自己的事件存储来持久化事件的状态时,我们需要定义EventStore bean 和MongoEventStorageEngine来使用Mongo作为事件存储。

结论
Axon 更像是实现 CQRS 的构建块,框架本身并不强制特定的实现方法。我们可以选择适合我们需求的解决方案。
可以通过GitHub访问此应用程序的完整代码。