在本教程中,我们将学习实现一个简单的 SpringBoot 应用程序,该应用程序实现了 CQRS 原理。
在这个用例中,我们将使用 Axon 4 框架。为了存储事件,我们将使用Mongo DB,而不是存储和查询数据的存储库,我们将使用静态 Map。
CQRS — 命令查询分离:简单来说,要么对数据执行操作,要么查询并返回数据。
Axon 是一个开源 Java 框架,用于开发事件驱动的微服务。Axon 带有 Axon 框架和 Axon 服务器。我们不会在本教程中使用 Axon 服务器,相反,我们将实现我们自己的事件存储。
持久层可以使用 Axon 提供的合适的 EventStorageEngine 实现,但是因为我们使用Mongo DB 时,我们将使用MongoEventStorageEngine.
设置
添加到我们的SpringBoot 项目(SpringBoot 2.5.0):
<dependency> <groupId>org.axonframework</groupId> <artifactId>axon-spring-boot-starter</artifactId> <version>${axon.version}</version> <exclusions> <exclusion> <groupId>org.axonframework</groupId> <artifactId>axon-server-connector</artifactId> </exclusion> <exclusion> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.axonframework.extensions.mongo/axon-mongo --> <dependency> <groupId>org.axonframework.extensions.mongo</groupId> <artifactId>axon-mongo</artifactId> <version>4.2</version> </dependency> <!-- https://mvnrepository.com/artifact/com.thoughtworks.xstream/xstream --> <dependency> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> <version>1.4.17</version> </dependency>
|
请注意,我们排除了默认的 xstream 和 axon-server-connector 依赖项。
axon-server-connector 被排除在外,因为我们依赖于 Axon Server。
与 axon 捆绑在一起的默认 XStream jar (1.4.10) 会导致以下错误:
“Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.util.Comparator java.util.TreeMap.comparator accessible: module java.base does not “opens java.util” to unnamed module @703580bf”
控制器
定义控制器: @Autowired private CommandGateway commandGateway; @PostMapping("/order") public ResponseEntity<Void> createOrder(@RequestBody Order order){ CreateOrderCommand command = new CreateOrderCommand(order.getOrderId(), order.getOrderAmount()); commandGateway.send(command); return ResponseEntity.ok().build(); }
CommandGateway将根据命令类型将命令发送到CommandHandler,在我们的示例中,命令类型为CreateOrderCommand。我们只能为一个命令定义一个命令处理程序。
|
命令处理程序
下面是命令处理程序:
@Aggregate public class OrderAggregate { @AggregateIdentifier private String orderId; private Double orderAmount; public OrderAggregate() {} @CommandHandler public OrderAggregate(CreateOrderCommand command) { CreateOrderEvent event = new CreateOrderEvent(command.getOrderId(), command.getOrderAmount()); AggregateLifecycle.apply(event); } @EventSourcingHandler public void on(CreateOrderEvent event) { this.orderId = event.getOrderId(); this.orderAmount = event.getOrderAmount(); }
|
将聚合视为包含状态的对象以及可以更改聚合状态的方法。聚合必须具有使用AggregateIdentifier注释的标识符。
一旦apply方法被调用,事件消息就会在aggregate的范围内发布。在我们的示例中,这意味着EventSourcingHandler 将调用由 注释的方法并更新订单域的状态。
一旦事件保存在事件存储中,事件总线将调用所有外部事件处理程序。
事件处理程序
@Component public class OrderQueryHandler { @Autowired private OrderRepository orderRepository; @EventHandler public void on(CreateOrderEvent event) { orderRepository.storeOrder(new Order(event.getOrderId(), null, event.getOrderAmount())); } }
|
EventHandler 用作实际持久化事件的投影。在我们的示例中,我们调用OrderRepository来存储订单详细信息,然后QueryHandlers(不包含在此应用程序中)将使用这些详细信息来获取数据。
配置定义
@Bean public MongoClient mongoClient(){ MongoClient mongoClient = new MongoClient("localhost", 27017); return mongoClient; } @Bean public EventStorageEngine storageEngine(MongoClient client) { return MongoEventStorageEngine.builder().mongoTemplate(DefaultMongoTemplate.builder().mongoDatabase(client).build()).build(); } @Bean public EmbeddedEventStore eventStore(EventStorageEngine storageEngine, AxonConfiguration configuration) { return EmbeddedEventStore.builder() .storageEngine(storageEngine) .messageMonitor(configuration.messageMonitor(EventStore.class, "eventStore")).build(); }
|
当我们定义我们自己的事件存储来持久化事件的状态时,我们需要定义EventStore bean 和MongoEventStorageEngine来使用Mongo作为事件存储。
结论
Axon 更像是实现 CQRS 的构建块,框架本身并不强制特定的实现方法。我们可以选择适合我们需求的解决方案。
可以通过GitHub访问此应用程序的完整代码。