使用Axon重播投射事件 - codecentric AG Blog

19-12-28 banq
              

事件存储是任何事件源应用程序的核心。它包含系统生命周期中发生的每个事件。这些事件包含应用程序中的每个状态更改。EventSourcing通常与命令查询责任隔离(CQRS)结合使用。对于Axon而言,这意味着可以在投影中实现单独的读取面。 通常,事件在两个主要位置影响了软件的当前状态:聚合和投影。 

事件处理器

如果您熟悉Axon,则可能知道处理器有两种类型:订阅事件处理器和跟踪事件处理器。这些处理器之间的主要区别在于,订阅事件处理器将自己订阅事件源,并由发布机制管理的线程调用。另一方面,跟踪事件处理器使用它自己管理的线程从源中提取消息。

投射

我们软件的当前状态是由聚合执行这些事件的结果。投影是基于这些事件的特定视图,每个视图都有其自己的目的。例如,如果您向新银行注册,则将在“ CustomerRegisteredEvent”行中发出事件。您可以想象一些投射对此事件感兴趣。比如可以跟踪已注册到银行的客户的所有个人信息的计划,或者可以跟踪客户在填写其所需的所有信息方面所取得的进展的计划。  

但是,随着时间的流逝,需求将发生变化,您将看到您希望更新结构和内容上的投射。这就是Axon重播的地方。重播是Axon框架支持的CQRS和EventSourcing的核心概念。

仅当您使用跟踪事件处理器时,才可以在Axon中重播。 

一旦指定了要重播的事件,Axon将:

  • 在重置之前,使用提供的事件和令牌的值创建重播令牌
  • 更新令牌存储中的令牌
  • 从该点打开事件流
  • 阅读事件
  • 在跟踪事件处理器上以工作单元处理事件

Spring Rest API

如何发起重播?有多种方法可以做到这一点。从Axon 4开始,可以在Axon Server中重播,但这超出了本博客文章的范围。另一种方法是在某个时间点重置跟踪事件处理器。此解决方案的一个警告是,您必须在重置之前手动停止跟踪事件处理器。第三种选择是,您可以手动更新令牌存储中的跟踪令牌以重新处理过去的事件,最后,我们的首选解决方案是使用Axon提供的Replay API。 

我们在Spring中利用此Replay API创建了安全的Rest API。我们唯一需要提供的信息是我们要重播的位置以及跟踪事件处理器的名称。在重置处理器之前,我们检查了处理器是否实际上在此实例上运行,如果是,则通过代码将其关闭。最终,Axon创建了一个重设令牌,该令牌包含重置时令牌的值,并将其插入令牌存储中。当我们重播所有事件时,Axon会将重播令牌转换回跟踪令牌,并从中断处继续执行。

为了让我们对TrackingEventProcessorService的外观有所了解:

@Service
@Slf4j
public class TrackingEventProcessorService {
 
  private final EventProcessingConfiguration eventProcessingConfiguration;
 
  public TrackingEventProcessorService(
        EventProcessingConfiguration eventProcessingConfiguration
  ) {
     this.eventProcessingConfiguration = eventProcessingConfiguration;
  }
 
  private TrackingEventProcessor getTrackingEventProcessor(String name) {
     return this.eventProcessingConfiguration
           .eventProcessor(name, TrackingEventProcessor.class)
           .orElseThrow(TrackingEventProcessorNotFoundException::new);
  }
 
  public boolean replay(String trackingEventProcessorName, Long index) {
     TrackingEventProcessor trackingEventProcessor = this.getTrackingEventProcessor(trackingEventProcessorName);
     if (!trackingEventProcessor.isRunning()) {
        this.logger
              .warn(
                    "Tracking event processor {} is not running in current instance or not running at all",
                    trackingEventProcessorName
              );
        return false;
     }
 
     trackingEventProcessor.shutDown();
 
     try {
        trackingEventProcessor.resetTokens(GapAwareTrackingToken.newInstance(index - 1, Collections.emptySortedSet()));
     } catch (UnableToClaimTokenException e) {
        // Ignore this exception and let the caller know setting the replay failed.
        this.logger.warn("Unable to claim token for trackingEventProcessor {} on id {}", trackingEventProcessorName, index - 1, e);
        return false;
     } finally {
        this.logger.info("Starting replay for trackingEventProcessor {} on id {}", trackingEventProcessorName, index - 1);
        trackingEventProcessor.start();
     }
     return true;
  }
}

该服务将使用Axon配置来获取跟踪事件处理器并开始重播。

心得

使用重放管理一年以上后,我们希望将一些发现与大小分享。

事件重播中的障碍在跟踪事件处理中,事件是在不同的线程中处理的,这使得错误处理更加复杂。我们的错误处理不正确,每当发生错误时,重播令牌就会卡住。令牌存储中的令牌失去了所有者,我们不得不重新部署服务以让Axon分配新的所有者。 

重播API的粘性会话我们遇到了一种情况,我们在负载均衡器上使用了粘性会话,而跟踪事件处理器未在我们路由到的实例上运行,因此未启动重放过程。当您的实例达到负载平衡时,最好实现轮询并重复调用。 

重播批量。在谈论重放时,处理速度(时间)是非常重要的因素。根据复杂性,要​​重播的事件数和投影的内存利用率,重播可能要花费几分钟到几天的时间。为了优化您的重放功能,可以设置跟踪事件处理器的批处理大小。要了解这如何使您的重放体验受益,应该知道Axon处理事件时会发生什么。对于每个批次,Axon都需要更新跟踪令牌,如果您使用的是事务性存储,它将开始并提交数据库事务。如果您一次处理一个事件,这会带来很多开销,并且可能会受益于更大的批处理量。

举个例子 

事件重播运行状况指标发生令牌处理器无法赶上新事件的情况。可能有多种原因,因为顺序处理非常重要。您需要调整处理一个事件所需的时间。我们曾多次花一整天的时间来优化RDMS查询,以使其达到最佳播放速度。理想情况下,一旦令牌在总修订版上进行,对于更简单的类似于CRUD的事件,处理单个后续事件的时间不应超过100毫秒。但是,这仅仅是一个粗略的估计。一个事件表可以轻松地包含数百万个事件,如果想充分利用重播功能,则重播事件流将需要数小时而不是数天。可以记录有关令牌位置和事件范围之间差异的指标。

(banq注:其实利用Java 8 stream等流式概念可以方便重播事件数据)