Java反应式事件溯源之第 4 部分:控制器


这里为 HTTP API 层选择了 Spring 框架,只是因为它非常流行。这可以是您想要的任何东西,只要记住我们正在构建一个反应式解决方案,因此使用具有非阻塞 API 的东西也是合理的,例如 Micronaut、Quarkus 等。
有ShowController2 个端点。第一个是ShowResponse通过id获取:

@RestController
@RequestMapping(value = "/shows")
public class ShowController {

    private final ShowService showService;

    public ShowController(ShowService showService) {
        this.showService = showService;
    }

    @GetMapping(value =
"{showId}", produces = "application/json")
    public Mono<ShowResponse> findById(@PathVariable UUID showId) {
        CompletionStage<ShowResponse> showResponse = showService.findShowBy(ShowId.of(showId)).thenApply(ShowResponse::from);
        return Mono.fromCompletionStage(showResponse);
    }

由于我们使用的是 Spring WebFlux,因此我们需要转换CompletionStage为Mono以保持反应性。在标准(阻塞)控制器中,我们需要阻塞并等待ShowService响应。
第二个端点更有趣,因为它用于座位预订和取消。

@PatchMapping(value = "{showId}/seats/{seatNum}", consumes = "application/json")
public Mono<ResponseEntity<String>> reserve(@PathVariable(
"showId") UUID showIdValue,
                                            @PathVariable(
"seatNum") int seatNumValue,
                                            @RequestBody SeatActionRequest request) {

    ShowId showId = ShowId.of(showIdValue);
    SeatNumber seatNumber = SeatNumber.of(seatNumValue);
    CompletionStage<ShowEntityResponse> actionResult = switch (request.action()) {
        case RESERVE -> showService.reserveSeat(showId, seatNumber);
        case CANCEL_RESERVATION -> showService.cancelReservation(showId, seatNumber);
    };

    return Mono.fromCompletionStage(actionResult.thenApply(response -> switch (response) {
        case CommandProcessed ignored -> accepted().body(request.action() +
" successful");
        case CommandRejected rejected -> badRequest().body(request.action() +
" failed with: " + rejected.error().name());
    }));
}

让我们跳过关于它是否是 RESTful 的讨论。在这种情况下,我们需要将来自服务的响应转换为适当的 HTTP 状态代码。
我们还需要一些基本的 Spring Beans 配置:
@Configuration
class BaseConfiguration {

    @Bean
    public Config config() {
        return PersistenceTestKitPlugin.config().withFallback(ConfigFactory.load());
    }

    @Bean(destroyMethod = "terminate")
    public ActorSystem<Void> actorSystem(Config config) {
        return ActorSystem.create(VoidBehavior.create(),
"es-workshop", config);
    }

    @Bean
    public ClusterSharding clusterSharding(ActorSystem<?> actorSystem) {
        return ClusterSharding.get(actorSystem);
    }

    @Bean
    Clock clock() {
        return new Clock.UtcClock();
    }
}

@Configuration
class ReservationConfiguration {

    @Bean
    public ShowService showService(ClusterSharding sharding, Clock clock) {
        return new ShowService(sharding, clock);
    }
}

这ActorSystem是一个相当沉重的结构。它应该只创建一次,是Bean. 创建类型化ActorSystem需要传递一些guardianBehavior. 此时,我们不需要这个功能,所以我们可以传递一个VoidBehavior:

public class VoidBehavior {
    public static Behavior<Void> create() {
        return Behaviors.receive(Void.class).build();
    }
}

guardianBehavior在手动创建actor的情况下更有用。在我们的案例中,我们正在使用分片来实现。
配置bean正在使用内存中的事件存储。这就是为什么akka-persistence-testkit_*依赖的范围必须是编译。这只是用于原型设计,在下一部分中,当我们引入一个可用于生产的事件存储时,它将被切换回测试。
 
控制器的测试:
使用WebTestClient:
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@DirtiesContext(classMode = ClassMode.AFTER_CLASS)
class ShowControllerItTest {

    @Autowired
    private WebTestClient webClient;

    @Test
    public void shouldGetShowById() {
        //given
        String showId = randomShowId().id().toString();

       
//when //then
        webClient.get().uri(
"/shows/{showId}", showId)
                .exchange()
                .expectStatus().isOk()
                .expectBody(ShowResponse.class).value(shouldHaveId(showId));
    }

值得注意的一点是,我们在每次测试后都关闭Spring Context,以避免Actor System的冲突:

2021-10-14 10:51:48,057 ERROR akka.io.TcpListener - Bind failed for TCP channel on endpoint [/127.0.0.1:2551]
java.net.BindException: [/127.0.0.1:2551] Address already in use
    at java.base/sun.nio.ch.Net.bind0(Native Method)
    at java.base/sun.nio.ch.Net.bind(Net.java:555)
    at java.base/sun.nio.ch.ServerSocketChannelImpl.netBind(ServerSocketChannelImpl.java:337)
    at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:294)
    at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:89)

使用@DirtiesContext(classMode = ClassMode.AFTER_CLASS)是不够的,我们还需要配置ActorSystem Bean的销毁方法@Bean(destroyMethod = "terminate")。
另一种方法是在所有测试中重用 Spring Context 和 Actor System,但是我们不能像在ShowServiceTest中那样手动创建 Actor System 。
  

运行应用程序
我们的应用程序已经准备好通过CinemaApplication类或从命令行./mvnw spring-boot:run -Dspring-boot.run.jvmArguments="--enable-preview" 启动(确保你使用的是Java 17)。

你可以用development/show.http文件(需要IntelliJ IDEA Ultimate)或development/show.sh的curl运行一些请求。
  
总结
当涉及到打包和明确的责任分工时,我是一个有点控制狂的人。这就是为什么我为此添加了一个带有ArchUnit断言的测试。
PackageStructureValidationTest将检查模块之间(基础不应依赖保留)和单个模块内部是否有违反规则的情况。
领域层不应该依赖于应用、api、基础设施(用于未来的变化)和akka。
应用层不应该依赖api、基础设施等。所有的规则都可以用这张图来表示:

查看part_4标签并使用该应用程序。主要收获是,使用 Akka Persistence 之类的工具,我们可以非常快速地对 Event Sourced 应用程序进行原型设计。我们可以轻松地将其添加到现有的阻塞或非阻塞堆栈中。我们可以长时间不使用持久的事件存储,但我有一种感觉,您希望看到一些可用于生产的东西。