使用Kafka实现事件溯源


EventSourcing事件溯源是存储实体相关的事件流(实则是明细表),而不是直接存储实体的“当前”状态。每个事件都是一个事实,它描述了实体发生的状态变化(过去时态!)。众所周知,事实是无可争议的,不可改变的。
拥有这样的事件流可以通过折叠folding与该实体相关的所有事件来找出实体的当前状态; 但请注意,反过来不可能 - 当仅存储“当前”状态时,我们会丢弃许多有价值的历史信息。
事件日志是事实的主要来源:当前状态始终可以从特定实体的事件流派生。获取事件流和当前状态并返回应用这些事件流修改实体后的状态:Event => State => State。这是一个函数式管道,对于这样的状态修改函数,如果有一个初始状态值,那么当前状态是事件流的折叠folding。(状态修改函数需要是纯粹的,以便可以多次自由地应用于相同的事件)。

在卡夫卡存储事件
要解决的第一个问题是如何在卡夫卡存储事件?有三种可能的策略:

  1. 将所有实体类型的所有事件存储在一个主题topic中(具有多个分区)
  2. 每个实体类型对应一个主题,例如所有用户相关事件的单独主题,所有与产品相关的事件对于一个主题等。
  3. 每个实体实例对应一个主题,例如每个单个用户和每个单个产品的单独主题

第三种策略是不可行的。如果系统中的每个新用户都需要创建新主题,我们会得到无限数量的主题。这样以后进行任何类型的聚合也将非常困难,例如搜索引擎中中需要索引所有用户,需要使用大量的主题,此外这种方式并不是所有人都预先知道的。
因此,我们可以在1.和2之间进行选择。两者都有其优点和缺点:只需一个主题,就可以更容易地获得所有事件的全局视图。另一方面,对于每个实体类型一个主题,可以分别对每个实体类型流进行分区和扩展。两者之间的选择取决于用例。
还可以以额外存储为代价来获取两者:从实体类型主题all-events主题派生所有事件全局视图。

本文我们假设我们正在使用每个实体类型对应单个主题这样方案,但是很容易推广到多个主题或实体类型。
(编辑:正如Chris Hunt在Twitter上所说,Martin Kleppmann撰写了一篇非常好的文章,深入探讨了如何为主题和分区分配事件)。

基本的事件溯源存储操作
从支持事件源的存储中获得的最基本操作是读取特定实体的“当前”(折叠folded)状态。通常,每个实体都有某种形式id。因此,根据此id,我们的存储系统应可以返回其当前状态。
事件日志是事实的主要来源:当前状态始终可以从特定实体的事件流派生。为了做到这一点,存储引擎需要一个纯粹的(无副作用)函数,获取事件和当前状态并返回修改后的状态:Event => State => State。给定这样的函数和初始状态值,当前状态是事件流的折叠。(状态修改函数需要是纯粹的,以便可以多次自由地应用于相同的事件。)
Kafka中“读取当前状态”操作的简单实现将从主题中流式传输所有事件,过滤它们以仅包括指定事件id并使用指定折叠函数折叠它们。如果存在大量事件(并且随着时间的推移,事件的数量仅增加),这可能是一个缓慢且耗费资源的操作。即使结果将缓存在服务节点的内存中,仍然需要定期重新创建,例如由于节点故障或缓存逐出。
因此,我们需要一种更好的方法。这就是卡夫卡流(Kafka-streams)和状态存储发挥作用的地方。Kafka-streams在一组节点上运行,这些节点共同消费一些主题。与常规Kafka使用者consumer一样,为每个节点分配了消费主题的多个分区。但是,kafka-streams为数据提供了更高级别的操作,从而可以更轻松地创建派生流。
kafka-streams一个高级操作是可以将流折叠到本地存储中。每个本地存储仅包含指定节点使用的分区中的数据。开箱即用的本地存储实现有两种:内存中的实现和基于各种数据库的实现。
回到事件源,我们可以将事件流折叠到状态存储数据库中,保存每个实体的“当前状态”到本地数据库中。
以下是使用Java API(serde代表序列化器/反序列化器)将折叠事件转换为本地存储的方式:

KStreamBuilder builder = new KStreamBuilder();
builder.stream(keySerde, valueSerde, "my_entity_events")
.groupByKey(keySerde, valueSerde)
// the folding function: should return the new state
.reduce((currentState, event) -> ...,
"my_entity_store");
.toStream();
// yields a stream of intermediate states
return builder;

有关完整示例,请查看Confluent 的订单微服务示例
(编辑:正如Sergei EgorovNikita Salnikov在Twitter上注意到的,对于事件采购设置,您可能希望更改默认的Kafka保留设置,以便基于时间或基于大小的更新限制生效,并可选择启用压实compaction。)

查询当前的状态
我们现在创建了一个状态存储,其中包含来自分区的所有实体的当前状态,但是如何查询它?如果查询是本地的(同一节点),那么它非常简单:

streams
.store("my_entity_store", QueryableStoreTypes.keyValueStore());
.get(entityId);

但是,如果我们想要查询另一个节点上存在的数据呢?我们如何找出它是哪个节点?在这里,最近为Kafka引入的另一个功能是:交互式查询。使用它们,可以查询Kafka的元数据并找出哪个节点处理给定id的主题分区(这使用幕后的主题分区器):

metadataService
.streamsMetadataForStoreAndKey("my_entity_store", entityId, keySerde)

然后是将请求转发到适当的节点。请注意,如何处理和实现节点间通信 - REST,akka-remote或任何其他方式 - 这些已经超出了kafka-streams的范围。Kafka只允许访问本地状态存储,并提供有关给定状态存储的主机的信息id。

故障转移
状态存储看起来不错,但如果节点出现故障会怎么样?为某个分区重新创建本地状态存储也可能是一项昂贵的操作。由于kafka-stream重新平衡(在添加或删除节点之后),它可能导致长时间延迟或请求失败。
这就是为什么默认情况下持久状态存储需要logged日志的原因:也就是说,对存储的所有更改都会另外写入changelog-topic。这个主题是紧凑的(我们只需要每个的最新条目id,没有变化的历史,因为历史被保存在事件中),因此尽可能小。多亏了这一点,在另一个节点上重新创建存储可以更快。
但这仍然可能导致重新平衡的延迟。为了进一步减少它们,kafka-streams可以选择为每个存储保留许多备用副本(num.standby.replicas)。这些副本在应用时会使用更改日志主题中的所有更新,并且只要当前分区失败,就可以该分区下其他复制副本节点充当主要状态存储。

一致性
使用默认设置,Kafka提供至少一次交付。也就是说,在节点故障的情况下,可能会多次传递某些消息。例如,如果系统在写入状态存储更改日志之后但在提交该特定事件的偏移量之前失败,则会将事件实现状态存储两次。这可能不是问题:我们的状态更新函数(Event => State => State)可以很好地应对这种情况。但这也不必; 在这种情况下,我们可以利用Kafka的一次性保证。这些确切的保证只适用于读写Kafka主题,但这就是我们在这里所做的一切:更新状态存储的更改日志和提交偏移都是Kafka主题在幕后写的,这些可以在事务中完成。
因此,如果我们的状态更新函数需要,我们可以使用一个配置选项processing.guarantee打开精确一次的流处理,这会导致性能下降,但是 - 没有任何东西是免费的。

监听事件
现在我们已经涵盖了查询和更新每个实体的“当前状态” - 如果需要其功能(副作用)怎么样?在某些时候,这将是必要的,例如:

  • 发送通知电子邮件
  • 在搜索引擎中索引实体
  • 通过REST(或SOAP,CORBA等)调用外部服务)

所有这些任务都以某种方式阻塞并涉及I / O(这是副作用的本质),因此将它们作为状态更新逻辑的一部分执行可能不是一个好主意:这可能会导致速率增加“主”事件循环中的失败并产生性能瓶颈。
此外,状态更新逻辑函数(Event => State => State)可以多次运行(在故障或重启的情况下),并且通常我们希望最小化多次运行指定某个事件的副作用的数量。
幸运的是,在我们处理Kafka主题时,我们有很大的灵活性。更新状态存储的流阶段可以发送未更改的事件(或者,如果需要,可以修改),并且可以以任意方式消费该结果流/主题(在Kafka中,主题和流是等效的)。而且,可以在状态更新阶段之前或之后消费它。最后,如果我们想要至少一次或最多一次运行副作用,我们也可以控制。只有在副作用成功完成后才能通过提交消耗的事件主题的偏移量来实现至少一次。相反,在最多一次,通过提交偏移前运行副作用。
至于如何运行副作用,根据使用情况,有许多选项。首先,我们可以定义一个Kafka-streams阶段,它为每个事件运行副作用,作为流处理函数的一部分。这很容易设置,但是当涉及重试,偏移管理和同时执行许多事件的副作用时,这不是一个非常灵活的解决方案。在更高级的情况下,使用例如reactive-kafka或其他“直接”Kafka主题消费者来定义处理可能更合适。
一个事件也可能触发其他事件  - 例如“订单”事件可能触发“准备发货”和“通知客户”事件。这也可以使用kafka-streams阶段来实现。
最后,如果我们想将事件或从事件中提取的一些数据存储在数据库或搜索引擎中,例如ElasticSearch或PostgreSQL,我们可能会使用Kafka Connect连接器来处理我们所有的主题消耗细节。 

创建视图和预测
通常,系统的要求不仅仅是查询和处理单个实体流。还需要支持组合多个事件流的聚合。这种聚合流通常称为投影projections,折叠后可用于创建数据视图。是否可以使用Kafka实现这一点?

再次请记住,在基本级别,我们只是处理存储我们事件的Kafka主题:因此,我们拥有“原始”Kafka消费者/生产者,kafka-streams组合者甚至KSQL的所有权力来定义预测。例如,使用kafka-streams,我们可以使用代码或类似SQL的KSQL 过滤流,映射,按键分组,聚合时间或会话窗口等。
这些流可以持久存储并可用于使用状态存储和交互式查询进行查询,就像我们对单个实体流所做的那样。

走得更远
随着系统的发展,为了防止事件流无限增长,某种形式的压缩或存储“当前状态” 快照可能会派上用场。这样,我们只能存储一些最近的快照以及它们之后发生的事件。
虽然Kafka 没有对快照的直接支持,就像其他一些事件溯源系统的情况一样,使用一些已经提到的机制,例如流,消费者,状态存储,绝对有可能添加这种功能。

加起来
虽然Kafka最初的设计并未考虑到事件源,但它是作为数据流引擎设计,主题可复制,可分区,状态可存储和流API等都非常灵活。因此,可以在Kafka之上实施一个事件溯源系统而不需要太多努力。此外,由于幕后总是有Kafka主题,我们可以获得额外的灵活性,可以使用高级流API或低级别消费者