Java反应式事件溯源之第3部分:服务


本节主要主题是将ShowEntity隐藏在一个不错的服务包装下。否则,与Actor交互所需的逻辑将在许多地方重复,这总是一个坏主意。六边形角度的角度来看,ShowService这将是我们的端口,将来可供任何适配器使用。
 
查询实体
在此之前,我们将从非常简单的事情开始。有时我们想向实体询问一些数据以供阅读。为了使它成为可能,我们需要添加另一个命令GetShow。现在,我知道你在想什么。这不是一个命令,它更像是一个查询。是的,这是真的。因此,所有内容都是类型化的,我们需要遵循通用合同。合同是我们的事件源实体将接受任何扩展的消息ShowEntityCommad。闭上眼睛,把GetShow它当作一个命令来获取一些数据。

public sealed interface ShowEntityCommand extends Serializable {

    record ShowCommandEnvelope(ShowCommand command, ActorRef<ShowEntityResponse> replyTo) implements ShowEntityCommand {
    }

    record GetShow(ActorRef<Show> replyTo) implements ShowEntityCommand {
    }
}

处理这样的命令非常简单。

public CommandHandlerWithReply<ShowEntityCommand, ShowEvent, Show> commandHandler() {
    return newCommandHandlerWithReplyBuilder().forStateType(Show.class)
        .onCommand(ShowEntityCommand.GetShow.class, this::returnState)
        .onCommand(ShowEntityCommand.ShowCommandEnvelope.class, this::handleShowCommand)
        .build();
}

private ReplyEffect<ShowEvent, Show> returnState(Show show, ShowEntityCommand.GetShow getShow) {
    return Effect().reply(getShow.replyTo(), show);
}

警告:只有当状态是不可变的时,这种实现才可以,就像我们的例子一样。状态可能是可变的,但是在我们响应它之前,我们需要创建一个完整的副本。否则,我们可能会破坏参与者/实体封装,我们将不再是线程安全的。
多亏了这个GetShow命令,我们可以用更多的逻辑来扩展我们的黑盒测试,这些逻辑也会在处理完预留座位命令后检查状态。

@Test
public void shouldReserveSeat_WithProbe() {
    //given
    var showId = ShowId.of();
    var showEntityRef = testKit.spawn(ShowEntity.create(showId, clock));
    var commandResponseProbe = testKit.<ShowEntityResponse>createTestProbe();
    var showResponseProbe = testKit.<Show>createTestProbe();

    var reserveSeat = randomReserveSeat(showId);

   
//when
    showEntityRef.tell(toEnvelope(reserveSeat, commandResponseProbe.ref()));

   
//then
    commandResponseProbe.expectMessageClass(CommandProcessed.class);

   
//when
    showEntityRef.tell(new ShowEntityCommand.GetShow(showResponseProbe.ref()));

   
//then
    Show returnedShow = showResponseProbe.receiveMessage();
    assertThat(returnedShow.seats().get(reserveSeat.seatNumber()).get().isReserved()).isTrue();
}

一些纯粹主义者可能会说我们应该为此创建一个读取模型,而命令处理方不应该负责查询。当然,我们将在另一篇文章中讨论 CQRS,但我更喜欢更实用的方法。从实体读取数据(通常)是一项非常便宜的操作,所以为什么不利用这一点。
 
服务
这只是做一些更有趣的事情所需的开端。您可能已经注意到,根据上面的测试,与Actor交互需要一些仪式。我的建议是将它封装在一个漂亮、干净的服务中。我们希望有一个ShowService只公开 3 种方法的方法:
public class ShowService {

    public CompletionStage<Show> findShowBy(ShowId showId) {    }

    public CompletionStage<ShowEntityResponse> reserveSeat(ShowId showId, SeatNumber seatNumber) {}

    public CompletionStage<ShowEntityResponse> cancelReservation(ShowId showId, SeatNumber seatNumber) {  }
}

由于我们正在构建一个反应式的解决方案,返回类型被CompletitionStage包裹,准备与其他阶段的处理相结合。返回ShowEntityResponse,它是ShowEntity契约的一部分,可能看起来像一个抽象的泄漏。这取决于惯例,如果这对你来说是有问题的,那就返回一些更具体的服务合同。在这里,这样做将是非常人为的,但在某些情况下,这将是更自然的。
 
分片Sharding
该服务的主要职责是隐藏或简化与ShowEntity的所有通信。我们将使用Akka Sharding,而不是像我们的测试中那样手动生成Acto:testKit.spoon(ShowEntity.create(showId, clock))。长话短说,分片让你有能力在Akka集群的不同节点上创建(并与Actor对话)。这样一来,我们就可以非常容易地分散负载,实现分布式单写原则
等等!这是否意味着我现在需要运行Akka集群?这并不像你想象的那么难,但答案是否定的(或者说不完全是)。
我们可以创建一个只有一个节点的单实例集群。它的行为会像一个标准的应用程序。在未来,如果我们决定要扩展并处理更多的流量,那么我们就可以通过许多节点、滚动更新、动态扩展等方式来实现Akka集群的全部潜力。最重要的是,我们只需要改变配置文件。负责与Actor对话的代码将是完全一样的。位置透明性实际上是Akka架构的主要驱动力之一。一切都被设计为默认在分布式设置中工作。
要启用分片,我们需要在application.conf文件中进行以下设置。
akka {

  actor {
    provider = "cluster"
  }

  cluster {
    seed-nodes = [
"akka://es-workshop@127.0.0.1:2551"]
  }

  remote {
    artery {
      canonical.hostname = 127.0.0.1
      canonical.port = 2551
    }
  }
}

Akka配置使用的是HOCON格式。我个人认为,这比YAML要好得多。在上面的片段中,我们把akka.actor.provider从本地改为集群。形成集群所需的种子节点与应用程序的地址相同。远程协议是artery(一个默认选项)。我们将在后面讨论dockerization和扩展时详细介绍这些设置。
为了获得EntityRef,我们需要用适当的Entity工厂方法来初始化sharding。这可以在服务构造函数中完成。

public ShowService(ClusterSharding sharding, Clock clock) {
    this.sharding = sharding;
    sharding.init(Entity.of(SHOW_ENTITY_TYPE_KEY, entityContext -> {
        ShowId showId = new ShowId(UUID.fromString(entityContext.getEntityId()));
        return ShowEntity.create(showId, clock);
    }));
}

多亏了entityRefFor方法,我们终于可以获得一个给定的showId的引用。
private EntityRef<ShowEntityCommand> getShowEntityRef(ShowId showId) {
    return sharding.entityRefFor(SHOW_ENTITY_TYPE_KEY, showId.id().toString());
}

同样的showId将在entityContext.getEntityId()方法中可用。
当我们通过EntityRef发送消息时,分片逻辑将检查ActorSystem中是否已经存在这样一个实体。
如果没有,那么它将获得如何创建实体的指令(在SHOW_ENTITY_TYPE_KEY下)。使用lambda表达式(上文)来建立它,并将消息路由到一个新的ShowEntity。
另一方面,如果实体已经存在于ActorSystem中,那么就不需要额外的步骤了。消息可以立即被路由。您不必为每个消息重新创建状态(这可能需要一些时间)。
这就是为什么基于Actor的事件源(用Akka实现)如此之快,因为一旦你加载状态,它就会等待处理下一个命令。
这就是为什么从实体中读取信息很便宜,因为在很多情况下,它不需要任何事件存储的访问。
有了Single Writer 单写原则(在第2部分中提到),我们就可以利用一致的writ-through cache的力量。与Akka Sharding和Akka Cluster一起,我们可以拥抱ActorSystem的全部潜力,玩转分布式单写者原则,但这是另一篇文章的主题。
从交互的角度来看,EntityRef的工作方式与ActorRef完全相同,但它是一个独立的抽象,以强调我们在使用分片时,实体的生命周期比普通Actor的情况下更复杂。

与Actor的交互有很多选择。在我们的服务层中,我们将采用Request-Response并使用ask方法。

public CompletionStage<Show> findShowBy(ShowId showId) {
    return getShowEntityRef(showId).ask(replyTo -> new ShowEntityCommand.GetShow(replyTo), askTimeout);
}


ask的方法可能会让人感到困惑,因为第一个参数是一个lambda函数,它将创建一个消息给Actor,并将replyTo引用到提问的Actor。
幸运的是,我们不需要创建这样一个Actor(就像我们这里的测试一样)。一切都被封装在ask方法中。
这看起来很奇怪,因为在引擎盖下,与actor通信的唯一方法是通过告诉方法(fire and forget)。如果你还不清楚,那么我建议仔细阅读关于交互模式的官方文档。
ask方法的第二个问题是,有时你必须帮助Java编译器确定返回类型,例如,这段代码将无法编译。

public CompletionStage<Show> findShowBy(ShowId showId) {
    var result = getShowEntityRef(showId).ask(replyTo -> new ShowEntityCommand.GetShow(replyTo), askTimeout);
    return result;
}

这样做更好:

public CompletionStage<Show> findShowBy(ShowId showId) {
    var result = getShowEntityRef(showId).<Show>ask(replyTo -> new ShowEntityCommand.GetShow(replyTo), askTimeout);
    // this will also work
   
// CompletionStage<Show> result = getShowEntityRef(showId).<Show>ask(replyTo -> new ShowEntityCommand.GetShow(replyTo), askTimeout);
    return result;
}

 
服务测试
测试ShowService应该是一项相当容易的任务。这一次,我们将手动创建ActorSystem(带有内存事件存储),只是为了检查在没有测试工具的情况下如何完成这项工作。
private static Config config = PersistenceTestKitPlugin.config().withFallback(ConfigFactory.load());
    private static ActorSystem system = ActorSystem.create("es-workshop", config);
    private ClusterSharding sharding = ClusterSharding.get(Adapter.toTyped(system));
    private Clock clock = new Clock.UtcClock();
    private ShowService showService = new ShowService(sharding, clock);

    @AfterAll
    public static void cleanUp() {
        TestKit.shutdownActorSystem(system);
    }

    @Test
    public void shouldReserveSeat() throws ExecutionException, InterruptedException {
       
//given
        var showId = ShowId.of();
        var seatNumber = randomSeatNumber();

       
//when
        var result = showService.reserveSeat(showId, seatNumber).toCompletableFuture().get();

       
//then
        assertThat(result).isInstanceOf(ShowEntityResponse.CommandProcessed.class);
    }

    @Test
    public void shouldCancelReservation() throws ExecutionException, InterruptedException {
       
//given
        var showId = ShowId.of();
        var seatNumber = randomSeatNumber();

       
//when
        var reservationResult = showService.reserveSeat(showId, seatNumber).toCompletableFuture().get();

       
//then
        assertThat(reservationResult).isInstanceOf(ShowEntityResponse.CommandProcessed.class);

       
//when
        var cancellationResult = showService.cancelReservation(showId, seatNumber).toCompletableFuture().get();

       
//then
        assertThat(cancellationResult).isInstanceOf(ShowEntityResponse.CommandProcessed.class);
    }

如果你分析一下日志,你可以看到单节点集群是正确形成:

Cluster Node [akka://es-workshop@127.0.0.1:2551] - Starting up, Akka version [2.6.16] ...
Cluster Node [akka:
//es-workshop@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
Cluster Node [akka:
//es-workshop@127.0.0.1:2551] - Started up successfully
Cluster Node [akka:
//es-workshop@127.0.0.1:2551] - No downing-provider-class configured, manual cluster downing required, see https://doc.akka.io/docs/akka/current/typed/cluster.htmldowning
Cluster Node [akka:
//es-workshop@127.0.0.1:2551] - Node [akka://es-workshop@127.0.0.1:2551] is JOINING itself (with roles [dc-default], version [0.0.0]) and forming new cluster
Cluster Node [akka:
//es-workshop@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
Cluster Node [akka:
//es-workshop@127.0.0.1:2551] - Leader is moving node [akka://es-workshop@127.0.0.1:2551] to [Up]

 
总结
ShowService是我们业务功能的主要端口。实际的服务合同当然可以根据您的需要和标准进行调整。在实体周围使用服务包装器将使代码更具可重用性和容错性。
请从part_3标签中查看完整的源代码,运行所有测试并分析日志