这里为 HTTP API 层选择了 Spring 框架,只是因为它非常流行。这可以是您想要的任何东西,只要记住我们正在构建一个反应式解决方案,因此使用具有非阻塞 API 的东西也是合理的,例如 Micronaut、Quarkus 等。
有ShowController2 个端点。第一个是ShowResponse通过id获取:
| @RestController@RequestMapping(value = "/shows")
 public class ShowController {
 
 private final ShowService showService;
 
 public ShowController(ShowService showService) {
 this.showService = showService;
 }
 
 @GetMapping(value = "{showId}", produces = "application/json")
 public Mono<ShowResponse> findById(@PathVariable UUID showId) {
 CompletionStage<ShowResponse> showResponse = showService.findShowBy(ShowId.of(showId)).thenApply(ShowResponse::from);
 return Mono.fromCompletionStage(showResponse);
 }
 
 | 
由于我们使用的是 Spring WebFlux,因此我们需要转换CompletionStage为Mono以保持反应性。在标准(阻塞)控制器中,我们需要阻塞并等待ShowService响应。
第二个端点更有趣,因为它用于座位预订和取消。
| @PatchMapping(value = "{showId}/seats/{seatNum}", consumes = "application/json")public Mono<ResponseEntity<String>> reserve(@PathVariable("showId") UUID showIdValue,
 @PathVariable("seatNum") int seatNumValue,
 @RequestBody SeatActionRequest request) {
 
 ShowId showId = ShowId.of(showIdValue);
 SeatNumber seatNumber = SeatNumber.of(seatNumValue);
 CompletionStage<ShowEntityResponse> actionResult = switch (request.action()) {
 case RESERVE -> showService.reserveSeat(showId, seatNumber);
 case CANCEL_RESERVATION -> showService.cancelReservation(showId, seatNumber);
 };
 
 return Mono.fromCompletionStage(actionResult.thenApply(response -> switch (response) {
 case CommandProcessed ignored -> accepted().body(request.action() + " successful");
 case CommandRejected rejected -> badRequest().body(request.action() + " failed with: " + rejected.error().name());
 }));
 }
 
 | 
让我们跳过关于它是否是 RESTful 的讨论。在这种情况下,我们需要将来自服务的响应转换为适当的 HTTP 状态代码。
我们还需要一些基本的 Spring Beans 配置:
| @Configurationclass BaseConfiguration {
 
 @Bean
 public Config config() {
 return PersistenceTestKitPlugin.config().withFallback(ConfigFactory.load());
 }
 
 @Bean(destroyMethod = "terminate")
 public ActorSystem<Void> actorSystem(Config config) {
 return ActorSystem.create(VoidBehavior.create(), "es-workshop", config);
 }
 
 @Bean
 public ClusterSharding clusterSharding(ActorSystem<?> actorSystem) {
 return ClusterSharding.get(actorSystem);
 }
 
 @Bean
 Clock clock() {
 return new Clock.UtcClock();
 }
 }
 
 @Configuration
 class ReservationConfiguration {
 
 @Bean
 public ShowService showService(ClusterSharding sharding, Clock clock) {
 return new ShowService(sharding, clock);
 }
 }
 
 | 
这ActorSystem是一个相当沉重的结构。它应该只创建一次,是Bean. 创建类型化ActorSystem需要传递一些guardianBehavior. 此时,我们不需要这个功能,所以我们可以传递一个VoidBehavior:
| public class VoidBehavior {public static Behavior<Void> create() {
 return Behaviors.receive(Void.class).build();
 }
 }
 
 | 
guardianBehavior在手动创建actor的情况下更有用。在我们的案例中,我们正在使用分片来实现。
配置bean正在使用内存中的事件存储。这就是为什么akka-persistence-testkit_*依赖的范围必须是编译。这只是用于原型设计,在下一部分中,当我们引入一个可用于生产的事件存储时,它将被切换回测试。
 
控制器的测试:
使用WebTestClient:
| @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)@DirtiesContext(classMode = ClassMode.AFTER_CLASS)
 class ShowControllerItTest {
 
 @Autowired
 private WebTestClient webClient;
 
 @Test
 public void shouldGetShowById() {
 //given
 String showId = randomShowId().id().toString();
 
 //when //then
 webClient.get().uri("/shows/{showId}", showId)
 .exchange()
 .expectStatus().isOk()
 .expectBody(ShowResponse.class).value(shouldHaveId(showId));
 }
 
 | 
值得注意的一点是,我们在每次测试后都关闭Spring Context,以避免Actor System的冲突:
| 2021-10-14 10:51:48,057 ERROR akka.io.TcpListener - Bind failed for TCP channel on endpoint [/127.0.0.1:2551]java.net.BindException: [/127.0.0.1:2551] Address already in use
 at java.base/sun.nio.ch.Net.bind0(Native Method)
 at java.base/sun.nio.ch.Net.bind(Net.java:555)
 at java.base/sun.nio.ch.ServerSocketChannelImpl.netBind(ServerSocketChannelImpl.java:337)
 at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:294)
 at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:89)
 
 | 
使用@DirtiesContext(classMode = ClassMode.AFTER_CLASS)是不够的,我们还需要配置ActorSystem Bean的销毁方法@Bean(destroyMethod = "terminate")。
另一种方法是在所有测试中重用 Spring Context 和 Actor System,但是我们不能像在ShowServiceTest中那样手动创建 Actor System 。
  
运行应用程序
我们的应用程序已经准备好通过CinemaApplication类或从命令行./mvnw spring-boot:run -Dspring-boot.run.jvmArguments="--enable-preview" 启动(确保你使用的是Java 17)。
你可以用development/show.http文件(需要IntelliJ IDEA Ultimate)或development/show.sh的curl运行一些请求。
  
总结
当涉及到打包和明确的责任分工时,我是一个有点控制狂的人。这就是为什么我为此添加了一个带有ArchUnit断言的测试。
PackageStructureValidationTest将检查模块之间(基础不应依赖保留)和单个模块内部是否有违反规则的情况。
领域层不应该依赖于应用、api、基础设施(用于未来的变化)和akka。
应用层不应该依赖api、基础设施等。所有的规则都可以用这张图来表示:

查看part_4标签并使用该应用程序。主要收获是,使用 Akka Persistence 之类的工具,我们可以非常快速地对 Event Sourced 应用程序进行原型设计。我们可以轻松地将其添加到现有的阻塞或非阻塞堆栈中。我们可以长时间不使用持久的事件存储,但我有一种感觉,您希望看到一些可用于生产的东西。