Java反应式事件溯源之第 2 部分:Actor 模型

22-01-23 banq

本节我们将解决并发访问的问题。. 我们的域代码非常优雅,但即使我们使用记录和一些不可变集合,它在多线程环境中也不是完全安全的。比如我们要实现在同时预定同一个座位的情况下,一个请求成功一个失败的保证。如何实施?在大多数情况下,您将在数据库级别引入某种乐观(或悲观)锁定。在这种情况下,您本身并没有处理并发。您宁愿将此责任转移到数据库。这对于中等负载的标准系统来说很好,但我们正在构建一个应该易于扩展的反应式解决方案。此外,您仅限于支持此类锁定的数据库,但情况并非总是如此。幸运的是,还有其他选项可以处理并发。它'实现Actor Model 的[url=https://doc.akka.io/docs/akka/current/typed/actors.html]Akka 堆栈[/url]。

Actor
Actor是一个非常简单但聪明的抽象,将帮助您轻松构建非常复杂的并发系统。您可以将其视为您所在州的保护者。如果您想从多个线程访问您的状态,以一种安全的方式,将其包装在 actor 中并只与 actor 对话。只有通过发送消息才能与参与者进行通信。每个参与者都有自己的消息框,它一次会消费一条消息。因此,通过正确的实现,2 个或更多并发线程无法对您的状态执行某些操作。

此外,来自 Akka的持久性参与者(也称为事件源参与者)能够以事件的形式持久化其状态。使用新的类型化 Actor API,您可能还会遇到entity. 它与sharding一起使用,当你想将你的Actor分布在不同的节点上时。这可能会让人感到困惑,所以在这一点上,每当您看到 Akka Entity 时,请记住,这实际上是一个Actor。当然,我现在跳过了很多细节,但是为了这一部分,我认为应该足以了解发生了什么。
将所需的依赖项添加到pom.xml后,您就可以创建您的第一个 actor。我们将只关注类型化 API。使用它更安全,建议将其作为默认选择。请注意,您仍然可以使用旧的经典 API。在功能方面,基本相同,但如果这是您第一次使用 Akka,请坚持使用类型化 API。犯一些愚蠢的错误会更难,编译器会为你检查很多东西。
要创建一个参与者或实体,我们需要定义它的行为。行为是封装参与者契约的抽象。如果您向我发送此消息,我会回复,或者根本不回复,这取决于您想要实现的目标。让我们检查一下并实现一个AdderActor.

record Add(int a, int b, ActorRef<Integer> replyTo) {
}

public class AdderActor extends AbstractBehavior<Add> {

    public AdderActor(ActorContext<Add> context) {
        super(context);
    }

    @Override
    public Receive<Add> createReceive() {
        return newReceiveBuilder().onMessage(Add.class, add -> {
            int result = add.a() + add.b();
            add.replyTo().tell(result);
            return Behaviors.same();
        }).build();
    }
}

我们需要扩展AbstractBehavior,并将其与我们的输入消息Add一起输入。输入的消息是一条记录,其中包含ints a和b,以及对需要回复结果的询问行为体的引用。处理这样一个消息是非常简单的。在消息的添加上,首先,用结果进行回复(告诉方法),然后返回相同的行为。所有的东西都是类型化的,所以没有办法用例如String来回复,编译器会为你检查合同。
 

Event sourced行为
使用来自 Akka 库的更高级别的抽象,对于事件溯源,有一种特殊的行为,称为EventSourcedBehaviour。因为我们想为每条消息发送一个回复,我们可以强制编译器使用EventSourcedBehaviorWithEnforcedReplies. 事件源行为使用 3 种其他类型参数化:Command、Event、State。看起来很眼熟,你不觉得吗?但是,这里有一个小的变化。我们不使用我们的域命令,而是使用一个包含域命令和回复所需的Actor引用的信封ShowCommandEnvelope。

public sealed interface ShowEntityCommand extends Serializable {

    record ShowCommandEnvelope(ShowCommand command, ActorRef<ShowEntityResponse> replyTo) implements ShowEntityCommand {
    }
}


为什么它有用呢?首先,我们的领域没有被Akka库触及,其次,我们以后会添加一些与我们的领域无关的命令。响应也是用ShowEntityResponse来类型化的。

public sealed interface ShowEntityResponse extends Serializable {

    final class CommandProcessed implements ShowEntityResponse {
    }

    record CommandRejected(ShowCommandError error) implements ShowEntityResponse {
    }
}

我们可以回复该命令已被处理或该命令因某些错误被拒绝。这就是我们的第一个事件源行为契约。

扩展EventSourcedBehavior*类需要实现3个方法。

@Override
public Show emptyState() {...}

@Override
public CommandHandlerWithReply<ShowEntityCommand, ShowEvent, Show> commandHandler() {...}

@Override
public EventHandler<Show, ShowEvent> eventHandler() {...}


 

ReplyEffect
因为我们只有一个命令,所以commandHandler的实现非常简单。

public CommandHandlerWithReply<ShowEntityCommand, ShowEvent, Show> commandHandler() {
    return newCommandHandlerWithReplyBuilder().forStateType(Show.class)
        .onCommand(ShowEntityCommand.ShowCommandEnvelope.class, this::handleShowCommand)
        .build();
}


有趣的部分是从handleShowCommand方法开始的。

private ReplyEffect<ShowEvent, Show> handleShowCommand(Show show, ShowEntityCommand.ShowCommandEnvelope envelope) {
    ShowCommand command = envelope.command();
    return show.process(command, clock).fold(
        error -> {
            return Effect().reply(envelope.replyTo(), new CommandRejected(error));
        },
        events -> {
            return Effect().persist(events.toJavaList())
                .thenReply(envelope.replyTo(), s -> new CommandProcessed());
        }
    );
}

这是我们将我们的事件源库与事件源域合并的地方。
要做到这一点,我们需要处理来自信封的域命令。有两种可能的结果。在出错的情况下,效果只是回复发件人说命令被拒绝。如果命令处理成功,我们将得到一个事件列表。
如前文所述,在我们用CommandProcessed消息回复之前,我们需要保存这些事件。
代码中没有明确说明的是(虽然有很好的记录),对于所有的持久化事件,将调用eventHandler方法,然后Akka Persistence将执行thenReply部分。由于EventSourcedBehaviorWithEnforcedReplies,编译器会检查我们是否返回ReplyEffect而不是简单的Effect。
为什么要在引擎盖下调用eventHandler?因为这样,你就不需要记住在所有的命令处理程序中使用它。另外,在恢复实体的过程中,当你需要加载给定聚合的所有事件并逐一应用它们时,这个方法会被调用。

public EventHandler<Show, ShowEvent> eventHandler() {
    return newEventHandlerBuilder()
        .forStateType(Show.class)
        .onAnyEvent(Show::apply);
}


在这种情况下,实现更加直接。我们只是将所有事件重定向到Show::apply域方法。
扩展EventSourcedBehavior*不仅为我们提供了处理命令、持久化和应用事件的良好抽象。它还以被动的方式保护状态免受并发访问。参与者线程不会被与数据库的通信阻塞。它将在底层事件存储响应后立即恢复工作。从那时起,它将使用更复杂的Stash机制版本缓冲所有命令。这基本上是一个的实现单个写入器的原理,建立真正的高性能一致的系统。
 

实体测试
好的,是时候运行它并测试它了。要创建一个Actor,您需要一个Actor系统。Actor系统就像Actor的“家”。必须管理所有参与者、它们的生命周期、调度策略等。您可以将其视为管理 Spring Bean 的 Spring Context。
对于测试,您不必手动创建 Actor System。您可以使用非常方便的实用程序,例如ActorTestKit某些单元测试配置覆盖的某些默认配置。不要忘记在所有测试后将其关闭。

private static final ActorTestKit testKit =        ActorTestKit.create(EventSourcedBehaviorTestKit.config().withFallback(UNIT_TEST_AKKA_CONFIGURATION));

@AfterAll
public static void cleanUp() {
    testKit.shutdownTestKit();
}

 
为了测试我们的事件源行为,至少有 2 种策略。

白盒测试
使用EventSourcedBehaviorTestKit的白盒测试非常好,因为您可以断言任何您想要的东西。不仅是响应,还有持久化的事件,以及actor内部的状态。

@Test
public void shouldReserveSeat() {
    //given
    var showId = ShowId.of();
    EventSourcedBehaviorTestKit<ShowEntityCommand, ShowEvent, Show> showEntityKit = EventSourcedBehaviorTestKit.create(testKit.system(), ShowEntity.create(showId, clock));
    var reserveSeat = randomReserveSeat(showId);

    //when
    var result = showEntityKit.<ShowEntityResponse>runCommand(replyTo -> toEnvelope(reserveSeat, replyTo));

    //then
    assertThat(result.reply()).isInstanceOf(CommandProcessed.class);
    assertThat(result.event()).isInstanceOf(SeatReserved.class);
    var reservedSeat = result.state().seats().get(reserveSeat.seatNumber()).get();
    assertThat(reservedSeat.isReserved()).isTrue();
}

 

黑盒测试
第二种策略是黑盒方法,您可以在其中与参与者互相调用并仅检查响应。Actor内部发生的事情对你是隐藏的。这种策略更接近Actor的生产使用。要从参与者那里获得响应,您需要使用称为 testing 的probe,它将模拟发送者参与者。

//given
var showId = ShowId.of();
var showEntityRef = testKit.spawn(ShowEntity.create(showId, clock));
var commandResponseProbe = testKit.<ShowEntityResponse>createTestProbe();

var reserveSeat = randomReserveSeat(showId);

//when
showEntityRef.tell(toEnvelope(reserveSeat, commandResponseProbe.ref()));

//then
commandResponseProbe.expectMessageClass(CommandProcessed.class);


 

行为设置
到目前为止,我没有涉及的一件事是ShowEntity.create方法。

public static Behavior<ShowEntityCommand> create(ShowId showId,
                                                 Clock clock) {
    return Behaviors.setup(context -> {
        PersistenceId persistenceId = PersistenceId.of("Show", showId.id().toString());
        return new ShowEntity(persistenceId, showId, clock, context);
    });
}

要创建一个Actor或实体,无论是通过 testKit.spoon 还是 EventSourcedBehaviorTestKit.create 方法,我们都需要传递 Behavior[T]。为此,我们使用了一个静态工厂方法。这个方法正在调用我们的私有构造函数,并用Behaviors.setup对其进行包装,我们可以返回Behavior[ShowEntityCommand]。PersistenceId(传递给构造函数)是一个键,我们将在这个键下存储所有的事件,对于一个给定的集合,在数据库中(这是下一课的主题)。
如果这让你感到困惑——别担心。花点时间,分析代码,运行测试,阅读日志。
 
与大多数教程相比,领域与技术(Actor)的东西是分开的。这种分离使您能够在将来从 Akka Persistence 切换到其他东西。域将保持不变。这很可能永远不会发生,但最好在代码库中分离关注点。通过这种方式添加新功能和改进应用程序将更加愉快。

请查看part_2标记中的完整源代码。

猜你喜欢