这里为 HTTP API 层选择了 Spring 框架,只是因为它非常流行。这可以是您想要的任何东西,只要记住我们正在构建一个反应式解决方案,因此使用具有非阻塞 API 的东西也是合理的,例如 Micronaut、Quarkus 等。
有ShowController2 个端点。第一个是ShowResponse通过id获取:
@RestController @RequestMapping(value = "/shows") public class ShowController {
private final ShowService showService;
public ShowController(ShowService showService) { this.showService = showService; }
@GetMapping(value = "{showId}", produces = "application/json") public Mono<ShowResponse> findById(@PathVariable UUID showId) { CompletionStage<ShowResponse> showResponse = showService.findShowBy(ShowId.of(showId)).thenApply(ShowResponse::from); return Mono.fromCompletionStage(showResponse); }
|
由于我们使用的是 Spring WebFlux,因此我们需要转换CompletionStage为Mono以保持反应性。在标准(阻塞)控制器中,我们需要阻塞并等待ShowService响应。
第二个端点更有趣,因为它用于座位预订和取消。
@PatchMapping(value = "{showId}/seats/{seatNum}", consumes = "application/json") public Mono<ResponseEntity<String>> reserve(@PathVariable("showId") UUID showIdValue, @PathVariable("seatNum") int seatNumValue, @RequestBody SeatActionRequest request) {
ShowId showId = ShowId.of(showIdValue); SeatNumber seatNumber = SeatNumber.of(seatNumValue); CompletionStage<ShowEntityResponse> actionResult = switch (request.action()) { case RESERVE -> showService.reserveSeat(showId, seatNumber); case CANCEL_RESERVATION -> showService.cancelReservation(showId, seatNumber); };
return Mono.fromCompletionStage(actionResult.thenApply(response -> switch (response) { case CommandProcessed ignored -> accepted().body(request.action() + " successful"); case CommandRejected rejected -> badRequest().body(request.action() + " failed with: " + rejected.error().name()); })); }
|
让我们跳过关于它是否是 RESTful 的讨论。在这种情况下,我们需要将来自服务的响应转换为适当的 HTTP 状态代码。
我们还需要一些基本的 Spring Beans 配置:
@Configuration class BaseConfiguration {
@Bean public Config config() { return PersistenceTestKitPlugin.config().withFallback(ConfigFactory.load()); }
@Bean(destroyMethod = "terminate") public ActorSystem<Void> actorSystem(Config config) { return ActorSystem.create(VoidBehavior.create(), "es-workshop", config); }
@Bean public ClusterSharding clusterSharding(ActorSystem<?> actorSystem) { return ClusterSharding.get(actorSystem); }
@Bean Clock clock() { return new Clock.UtcClock(); } }
@Configuration class ReservationConfiguration {
@Bean public ShowService showService(ClusterSharding sharding, Clock clock) { return new ShowService(sharding, clock); } }
|
这ActorSystem是一个相当沉重的结构。它应该只创建一次,是Bean. 创建类型化ActorSystem需要传递一些guardianBehavior. 此时,我们不需要这个功能,所以我们可以传递一个VoidBehavior:
public class VoidBehavior { public static Behavior<Void> create() { return Behaviors.receive(Void.class).build(); } }
|
guardianBehavior在手动创建actor的情况下更有用。在我们的案例中,我们正在使用分片来实现。
配置bean正在使用内存中的事件存储。这就是为什么akka-persistence-testkit_*依赖的范围必须是编译。这只是用于原型设计,在下一部分中,当我们引入一个可用于生产的事件存储时,它将被切换回测试。
控制器的测试:
使用WebTestClient:
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) @DirtiesContext(classMode = ClassMode.AFTER_CLASS) class ShowControllerItTest {
@Autowired private WebTestClient webClient;
@Test public void shouldGetShowById() { //given String showId = randomShowId().id().toString();
//when //then webClient.get().uri("/shows/{showId}", showId) .exchange() .expectStatus().isOk() .expectBody(ShowResponse.class).value(shouldHaveId(showId)); }
|
值得注意的一点是,我们在每次测试后都关闭Spring Context,以避免Actor System的冲突:
2021-10-14 10:51:48,057 ERROR akka.io.TcpListener - Bind failed for TCP channel on endpoint [/127.0.0.1:2551] java.net.BindException: [/127.0.0.1:2551] Address already in use at java.base/sun.nio.ch.Net.bind0(Native Method) at java.base/sun.nio.ch.Net.bind(Net.java:555) at java.base/sun.nio.ch.ServerSocketChannelImpl.netBind(ServerSocketChannelImpl.java:337) at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:294) at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:89)
|
使用@DirtiesContext(classMode = ClassMode.AFTER_CLASS)是不够的,我们还需要配置ActorSystem Bean的销毁方法@Bean(destroyMethod = "terminate")。
另一种方法是在所有测试中重用 Spring Context 和 Actor System,但是我们不能像在ShowServiceTest中那样手动创建 Actor System 。
运行应用程序
我们的应用程序已经准备好通过CinemaApplication类或从命令行./mvnw spring-boot:run -Dspring-boot.run.jvmArguments="--enable-preview" 启动(确保你使用的是Java 17)。
你可以用development/show.http文件(需要IntelliJ IDEA Ultimate)或development/show.sh的curl运行一些请求。
总结
当涉及到打包和明确的责任分工时,我是一个有点控制狂的人。这就是为什么我为此添加了一个带有ArchUnit断言的测试。
PackageStructureValidationTest将检查模块之间(基础不应依赖保留)和单个模块内部是否有违反规则的情况。
领域层不应该依赖于应用、api、基础设施(用于未来的变化)和akka。
应用层不应该依赖api、基础设施等。所有的规则都可以用这张图来表示:

查看part_4标签并使用该应用程序。主要收获是,使用 Akka Persistence 之类的工具,我们可以非常快速地对 Event Sourced 应用程序进行原型设计。我们可以轻松地将其添加到现有的阻塞或非阻塞堆栈中。我们可以长时间不使用持久的事件存储,但我有一种感觉,您希望看到一些可用于生产的东西。