Java反应式事件溯源:领域


这篇博文开始了一系列文章,这些文章将从许多不同的角度非常深入地展示事件溯源模式的实现。我即将发布的帖子背后的主要目标是:

  1. 让您相信事件溯源并不难实现,
  2. 提供正确的工具来帮助您快速完成这项工作,
  3. 展示如何在没有任何框架依赖项的情况下对域代码进行建模
  4. 解释如何在没有乐观(或悲观)锁定的情况下创建一致且可扩展的应用程序
  5. 并展示如何使用您最喜欢的堆栈(如 Spring)在 Java 中执行此操作。

 
领域
许多人认为事件溯源模式实现起来非常复杂。我可以对此提出异议,但我知道这可能具有挑战性,尤其是第一次。对我来说有问题的是,当您根本没有选择并且您需要实现非常接近事件溯源或完全事件溯源适应的东西时,但您仍然拒绝遵循这条道路,因为这是新事物,具有挑战性,而您对此没有任何经验。根据我的观察,这种恐惧驱使您采用一种经典的方法来实现状态持久性,并带有一些附加功能。随着时间的推移,与基于事件溯源的实现相比,这些模仿事件溯源的附加功能变得非常复杂,难以理解和维护,令人惊讶的是,事件溯源可以简化很多。
让我们从一个基本理论开始。我建议您跟踪这个Github 存储库。我不想重复所有这些信息,因为我更喜欢通过实践学习的方法。理论部分将被最小化。
从领域的角度来看,事件溯源是一种非常简单的模式。有3个主要构建块:
  • 命令——定义我们想要在系统中发生的事情,
  • 状态——它通常是 DDD 方法的聚合,负责保持系统的某些部分一致和有效(聚合不变量)
  • 事件——捕捉系统中发生的事情。

状态/聚合通常需要提供2 个入口点方法:
  • 列表 进程(命令命令),
  • 状态应用(事件事件)。

这个系列最难的部分是找到一个好的领域示例,不像大多数教程那样简单,但也不太复杂,更多地关注事件溯源部分。想象一下,我们正在创建一个用于销售电影演出门票的应用程序。在我们简单的实现中,状态被建模为 Show 聚合,它代表一个电影节目,您可以在其中预订座位。显示记录包含一个id、title和 的集合seats,在我们的例子中,是一个方便使用的Map,但它可以是任何东西。其他字段,如节目Show的实际时间,可能还有一些其他字段被省略。

record Show(ShowId id, String title, Map<SeatNumber, Seat> seats) {

    public static final BigDecimal INITIAL_PRICE = new BigDecimal("100");

    public static Show create(ShowId showId) {
        return new Show(showId,
"Show title " + showId.id(), SeatsCreator.createSeats(INITIAL_PRICE));
    }
}

Seat 封装了如下信息:座位号、状态和价格:

record Seat(SeatNumber number, SeatStatus status, BigDecimal price)

enum SeatStatus {
    AVAILABLE, RESERVED
}

到目前为止,Show的构建被简化为只有一种工厂方法,它将使用另一种辅助方法以初始价格填充 10 个可用席位。

现在,让我们预订一个座位。要实现这个简单的用例,首先,我们需要一个命令。

sealed interface ShowCommand extends Serializable {
    ShowId showId();

    record ReserveSeat(ShowId showId, SeatNumber seatNumber) implements ShowCommand {
    }
}

ReserveSeat命令是一条记录,因为我们不想修改它。它还扩展了通用接口ShowCommand。这个接口是密封的,所以它的所有实现都在里面。我们不允许将其扩展到其他任何地方,因为它没有任何意义。该命令将被process前面提到的方法使用。

public Either<ShowCommandError, List<ShowEvent>> process(ShowCommand command, Clock clock) {
    return switch (command) {
        case ReserveSeat reserveSeat -> handleReservation(reserveSeat, clock);
    };
}

这个方法的签名对你来说可能很有趣。我们使用Either来自Vavr 库的类型,因此我们可以在可能失败的情况下返回ShowCommandError,
或者在命令成功处理的情况下返回事件列表。如果这对您来说是新事物,并且您在业务失败的情况下抛出异常,我真的建议您熟悉 Vavr 库并更广泛地使用类型系统。
Show聚合也是使用 Vavr 集合(Map 和 List),因此整个状态是不可变的(虽然不是强制性的,我们将在下一篇文章中讨论)。此外,我们将时钟作为方法参数传递。测试这样的代码更容易,因为我们总是可以模拟时钟。
处理预订非常简单。

private Either<ShowCommandError, List<ShowEvent>> handleReservation(ReserveSeat reserveSeat, Clock clock) {
    SeatNumber seatNumber = reserveSeat.seatNumber();
    return seats.get(seatNumber).<Either<ShowCommandError, List<ShowEvent>>>map(seat -> {
        if (seat.isAvailable()) {
            return right(List.of(new SeatReserved(id, clock.now(), seatNumber)));
        } else {
            return left(SEAT_NOT_AVAILABLE);
        }
    }).getOrElse(left(SEAT_NOT_EXISTS));
}

首先,我们需要找到座位,如果这样的座位不存在,我们可以返回SEAT_NOT_EXISTS错误,或者如果座位不可用SEAT_NOT_AVAILABLE。最后,如果有空位,我们可以预订。实际上,域验证会更加复杂,只要记住该process方法是适合它的地方。请注意,我们并没有改变状态,只是返回一个事件。

sealed interface ShowEvent extends Serializable {
    ShowId showId();

    Instant createdAt();

    record SeatReserved(ShowId showId, Instant createdAt, SeatNumber seatNumber) implements ShowEvent {
    }
}

这可能看起来与命令非常相似,但它完全不同。该SeatReserved事件是我们系统中的一个事实,将存储在我们的事件存储中。这是事件溯源的本质,其中命令只是用于操作封装的方便 DTO。
正如我所提到的,当我们离开 process 方法时,状态将完全相同。我们没有改变它。我们不能在这里改变它。我们正在实现事件溯源模式,所以要习惯我们数据库中的状态不再存在的事实。它是通过重播所有持久事件而创建的。在我们改变状态之前,我们需要持久化一个(或多个)事件,然后我们才能应用它并获得一个新版本的状态。这就是为什么我们需要一个单独的方法来应用事件。
理解在 process 方法中改变状态可能会导致非常讨厌的错误非常重要,其中处理命令后的状态与从事件重建时的状态不同。应用事件方法签名不返回任何错误。这一次,如果我们找不到座位,我们将抛出异常。没有办法优雅地处理这种情况。这不是业务错误。很可能是实现失败或并发问题。通常,这不应该发生。实现的另一个关键方面是此方法不会有任何副作用,因为这些也将在状态恢复期间执行。

public Show apply(ShowEvent event) {
    return switch (event) {
        case SeatReserved seatReserved -> applyReserved(seatReserved);
    };
}

private Show applyReserved(SeatReserved seatReserved) {
    Seat seat = getSeatOrThrow(seatReserved.seatNumber());
    return new Show(id, title, seats.put(seat.number(), seat.reserved()));
}

private Seat getSeatOrThrow(SeatNumber seatNumber) {
    return seats.get(seatNumber).getOrElseThrow(() -> new IllegalStateException("Seat not exists %s".formatted(seatNumber)));
}

在我们的例子中,应用SeatReserved将简单地覆盖具有保留状态的席位,所有这些都以不可变的方式进行。
这并不难,你不觉得吗?当然,仍然缺少很多重要的部分,如持久性、并发访问等。别担心,我们稍后会介绍。实际上,我是故意这样做的,因为我想强调一个想法。您的域代码是源代码中最重要的部分。您将使用的所有框架、数据库等也是相关的,但不要让它们决定您应该如何实现您的域。让这成为我们的座右铭。
 
测试
使用这种方法,我们不需要启动 Spring Context 或 Actor System(或任何繁重的东西)来测试域代码。简单的单元测试就可以完成这项工作。

private Clock clock = new FixedClock(Instant.now());

@Test
public void shouldReserveTheSeat() {
    //given
    var show = randomShow();
    var reserveSeat = randomReserveSeat(show.id());

   
//when
    var events = show.process(reserveSeat, clock).get();

   
//then
    assertThat(events).containsOnly(new SeatReserved(show.id(), clock.now(), reserveSeat.seatNumber()));
}

第一个测试是相当明显的。我们需要检查返回的事件是否是我们预期的。如您所见,我们使用固定时钟进行测试。
有时,我们可以应用它们并断言状态本身,而不是断言事件。

//when
var events = show.process(reserveSeat, clock).get();
var updatedShow = apply(show, events);

//then
var reservedSeat = updatedShow.seats().get(reserveSeat.seatNumber()).get();
assertThat(events).containsOnly(new SeatReserved(show.id(), clock.now(), reserveSeat.seatNumber()));
assertThat(reservedSeat.isAvailable()).isFalse();

我们预计updatedShow变量中选定的座位不再可用。其余测试将在 Github 上提供。
 
扩展代码
添加另一个命令怎么样?我们也想取消我们的预订。这和以前的故事一样。我们需要处理CancelSeatReservation命令,返回一个新SeatReservationCancelled事件,并应用这个事件。

public Either<ShowCommandError, List<ShowEvent>> process(ShowCommand command, Clock clock) {
    return switch (command) {
        case ReserveSeat reserveSeat -> handleReservation(reserveSeat, clock);
        case CancelSeatReservation cancelSeatReservation -> handleCancellation(cancelSeatReservation, clock);
    };
}

public Show apply(ShowEvent event) {
    return switch (event) {
        case SeatReserved seatReserved -> applyReserved(seatReserved);
        case SeatReservationCancelled seatReservationCancelled -> applyReservationCancelled(seatReservationCancelled);
    };
}


您可能会注意到我们在 Java 17 中的switch 语句中使用了模式匹配。它仍然是一个预览功能,因此您需要明确启用它。可能在生产环境中,您可以坚持 if 语句中的模式匹配,但我也想在创建这些示例时获得一些乐趣。使用密封Seal接口,一旦您添加了新的命令或事件,Java 编译器将突出显示您需要更新代码的所有地方。
 
这就是第一部分。完整的源代码可在此处获得,记得查看part_1标签