持久性Akka、Kafka、Cassandra实现CQRS数据同步

18-06-29 banq
                   

本文是讨论数据库在读写分离情况下,如何实现写数据库的数据如何快速更新到读数据库的三种方式。

Akka Persistence(持久性Akka)是一个相当不错的事件溯源EventSourcing实现。当我们选择EventSourcing架构时,自然同时也会采用CQRS,CQRS是将查询操作与持久性的写操作分离,这样事件存储数据库和事件查询数据库就是两个不同数据库,这种读写分离带来的代价是最终一致性,所以最大的问题是:如何有效快速地更新读取模型?

先看看CQRS/ES的基本要求:

1.读数据库应基于保存事的数据库进行更新,
2.必须保证没有事件可以丢失,
3.事件顺序必须保证。

前两点非常明显,如果无法实现事件的顺序性,则根本不会选择事件溯源这个架构。为满足上述这些要求,选择合适策略在很大程度上取决于你的领域(你拥有多少持久Actor,你正在制造多少事件等等)以及用于存储事件的底层数据库。在写这篇文章的时候,我选择存储事件(和快照)的武器是Apache Cassandra--一个高度可扩展的分布式数据库。现有的Cassandra插件已经多次证明它是稳定的并且可以生产。有些传闻说Scylla是更有效的存储,但它仍然处于研发阶段。

通常情况下,我们会有多个不同的读模型。其中一些读操作比其他更重要,一致性要求可能特别高,我们需要可以针对每个读模型进行扩展,下面几个方案是如何快速将持久的事件快速同步到查询数据库。

Akka Persistent Query
第一种方法非常简单,我们可以使用Akka堆栈中的内置解决方案,即Persistence Query(持久性查询)。这个想法如下:

1.连接到事件日志数据库并将事件作为流提供。
2.更新读取模型数据库。
3.保存处理后的事件序列号。


val eventJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

eventJournal
.eventsByPersistenceId(persistenceId, startingSequenceNr, Long.MaxValue)
.via(updateSingleReadModel)
.mapAsync(1)(saveSequnceNr)


保存序列号对于恢复阶段是必需的,这样您就不会从头开始处理事件。

简单而优雅的解决方案,对吗?不幸的是这种方案不是响应式Rective的。默认情况下Cassandra时间间隔为3秒。起初,这可能很好,但假设生产中有10000个持久性Actor,10000虽然不是一个非常大的数量,但足以杀死你的应用程序。对于每个持久性Actor,都将需要启动一个流,请相信我10000个流真不是最好的主意。实际上,如果要独立更新读取模型数据库,应该将持久性Actor的数量乘以读取模型的数量。

相比采用eventsByPersistenceId,可以使用使用eventsByTag查询被标记的事件。在大多数情况下,这种方式这工作得也很好,但你可能会面临事件分发的问题。假设大部分事件是由1%的持久性Actor产生的标记,可能导致其他99% 持久性Actor的事件处理滞后,因为所有事件都集中在同一个Actor源中了。解决方案可能是对标记进行分片,Lagom框架就是这么实现的。

不幸的是,没有任何技巧完美可以解决数据库轮询数据的问题。3秒滞后在其他情况下可能不成问题,对于某些情况即使0.5秒却也不能接受,同时,太小的时间间隔也会造成底层数据库不必要的负载,该分析是否有其他选项了?

CDC
是否可以直接从数据库中通过流方式传输数据呢?Cassandra(与大多数数据库一样)支持CDC机制。理论上,很容易连接到Cassandra的更改日志并使用这些日子来更新读取模型。太好了,但这里有一个问题。Cassandra是一个分布式数据库,因此每个数据库节点都有一个单独的CDC日志文件,而且管理这些服务器日志以确保整体事件顺序性会是一场噩梦。此时,CDC方式应该被认为是“有害的”。

Kafka作为一个数据库
如果从Cassandra的读取效率确实是一个大问题,也许我们可以使用像卡夫卡Kafka这样的消息队列作为事件存储?从卡夫卡中读取事件流是非常有效的。每个读模型数据库将由不同的卡夫卡消费者更新,每个都是一个独立的过程,完全自主,独立和独立可扩展。整个概念在我们之前的博客文章中有详细描述。对于某些应用,这种方法可以顺利运行。但是,在某些情况下,Kafka(或实际上任何消息队列)作为数据库可能带来很多其他问题:

1.快照管理。
2.Retention滞留管理(Retention应该可能被禁用)。
3.Kafka分区 - 为了保持顺序,来自聚合的所有事件必须放入单个分区,该分区必须适合单个节点。在一些重载情况下,这可能是一个阻碍或需要解决的挑战。
4.Akka Persistence不支持。

卡桑德拉,卡夫卡和至少一次交付
如何将两个概念结合在一起?Cassandra用作存储事件的数据库(真相的来源),Kafka负责将数据处理写入到读取模型的数据库。




理论上,这是完美的。唯一的问题是如何有效地把事件发给卡夫卡,可以使用前面所述持久Actor的eventsByPersistenceId或eventsByTag方式将Cassandra的事件发给Kafka,但滞后3秒和分发不平均问题仍将存在。

这里还有一种方法是:在持久性Actor存储事件到Cassandra数据库以后立即向Kafka发送事件。持久性Actor的算法很简单:

1.接收命令
2.保存事件(S)
3.插入Cassandra数据库
4.发送事件给卡夫卡

我们来看看这里可能存在的问题:为了确保事件顺序性,发送给Kafka必须阻塞整个Actor,这当然也是一个坏主意,因为它会降低持久性Actor的性能。我们可以将卡夫卡生产者委托给另外一个独立的child actor(我们称之为KafkaSender)。这样就太棒了,但是我们还必须确保这两个Actor之间的消息传递。这可以通过使用AtLeastOnceDelivery特征来完成。至此,你已经可能觉得好像有些复杂,其实没有什么是免费的,确实,更多的消息会在我们的Actor集群中循环传播,我们也可能会失去顺序:

至少一次投递意味着原始邮件发送顺序并不总是保留。

考虑使用一些事件缓冲机制以确保事件顺序?请停止!至少一次交付能以不同的、更乐观的方式完成。您可以将事件发送到KafkaSender而无需确认发送,但您需要监视事件序列号,如果序列号有任何差异,则需要采取额外的措施:

1.如果序列号低于当前序列号 - 事件已经处理完毕,可以跳过

2.在间隔高于1的情况下,存储消息并启动eventsByPersistenceId以填补空白,并清除挂起的消息

当然,当前序列号应该在KafkaSender失败或持久Actor重启后保持并恢复。听起来很复杂?诚然,但没有人说高可扩展性是便宜的。(banq注:可以采取Kafka 1.0以后的事务性消息)

结论
最后一个方案显然是最真实、最有效、也可扩展的,但却最难实施,如果你确实需要这种级别的可扩展性,那么也要接受不可避免的复杂性。这里我们讨论了几种如何将存储数据库的事件同步到读数据库的方案,只要能满足预期的延迟和吞吐量,按照你自己的实际情况选择即可,如果需要强一致性,比如将事件溯源架构的内部延迟压缩到最后一毫秒,那么这篇文章也提供了这种复杂的解决方案。


Scalable read model updates in Akka Persistence –

[该贴被banq于2018-06-29 19:39修改过]
[该贴被admin于2018-07-01 08:59修改过]
[该贴被admin于2018-07-01 09:03修改过]

                   

1