Java反应式事件溯源之第5部分:事件存储

22-01-23 banq

选择事件存储数据库需要大量研究。这可能是一个单独的博客系列的主题(可能在将来),所以我的计划是从不同的角度来解决它。不要试图找到最佳解决方案(因为恕我直言不存在)。相反,尝试找到一个最佳的折衷方案并为改变做好准备。
使用Akka Persistence有一个巨大的优势,即持久性只是一个插件。我们可以将此库与任何能够履行AsyncWriteJournal合同的事件存储一起使用。我们只需要实现 4 种方法来使用我们的自定义事件存储解决方案:

@Override
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(Iterable<AtomicWrite> iterable) {...}

@Override
public Future<Void> doAsyncDeleteMessagesTo(String s, long l) {...}

@Override
public Future<Void> doAsyncReplayMessages(String s, long l, long l1, long l2, Consumer<PersistentRepr> consumer) {...}

@Override
public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {...}


SnapshotStoreReadJournal的读取模型投影的情况类似。
我并不是说实现自定义 Akka Persistence 插件是一项微不足道的任务。这将是一个相当大的挑战,但至少是可能的(这是我同事的例子)。我们在未来不受今天在当前背景下做出的选择的限制。此外,我们可以推迟一些困难的决定,例如使用分布式事件存储(例如 Cassandra)还是使用单个主机(例如 PostgreSQL、MariaDB)。
对于初学者,有可用的插件,例如:


 

JDBC
经典的关系数据库适用于很多项目。我想大多数开发人员对这种数据库都有一些经验。要为我们的事件源行为启用Akka Persistence JDBC 插件,您所要做的就是添加适当的依赖项和一些配置:

akka{
  persistence.journal.plugin = "jdbc-journal"
}

jdbc-journal {
  slick = {
    profile = "slick.jdbc.PostgresProfile$"
    db {
      host = "localhost"
      port = "5432"
      dbName = "postgres"
      url = "jdbc:postgresql://"${slick-akka.db.host}":"${slick-akka.db.port}"/"${slick-akka.db.dbName}"?reWriteBatchedInserts=true"
      user = "admin"
      password = "admin"
      driver = "org.postgresql.Driver"
    }
  }
}


在我们运行docker-compose -f development/docker-compose-jdbc.yml起来之后,我们可以启动我们的应用程序,提出一些请求,并验证我们的关系型事件存储在event_journal表中包含一些事件。不要对event_payload列感到惊讶,因为我们的事件是以二进制blob的形式保存的。对于原型设计,我们使用的是Java序列化机制,当然这对于生产部署来说是一个非常糟糕的主意,应该改变。在这一部分,我们将不关注序列化本身(这需要一个单独的帖子),但如果你想阅读更多关于可能的选择,只需阅读我关于事件源的序列化策略的帖子。
这样一个解决方案的反应性如何?与事件存储进行通信的JDBC驱动是一个阻塞式的。的确,在写这篇文章的时候,R2DBC(反应式JDBC)驱动还不被Akka Persistence支持。我希望这在未来会有所改变。幸运的是,JDBC驱动被一个Slick库包裹着,它给了我们一个相当不错的、可管理的(一个独立的线程池)反应式门面。不要忘了,Actor不会因为与底层事件存储的交互而被阻断。
 

Cassandra
在某种程度上,单个主机数据库是不够的。垂直缩放将达到其极限并处理更多负载,我们将需要切换到分布式的东西。虽然,并非所有分布式数据库都会提高吞吐量。使用leader-follower 架构(例如MongoDB),我们仍然会受到单个主机(leader)的限制。我们应该关注像 Apache Cassandra 这样的无领导者解决方案。从写的角度来看,Cassandra 是事件存储的绝配,我们会得到:

  • 分区(数据分布在集群中的所有节点上),
  • 复制(如果节点发生故障,我们可以使用副本),
  • 针对写入进行了优化(吞吐量非常好),
  • 近线性水平缩放。

如果你曾经使用过 Cassandra,你应该知道模式建模是这个数据库的关键。Akka 持久性 Cassandra插件意识到了这一点,用于存储事件的表模式就像一个魅力。分区不会太小,但也不会太大(实际上,您可以配置单个分区应该有多少事件)。使用这样的模式,读取给定聚合的事件将非常有效(对于恢复阶段很重要)。通信的反应式驱动器将非常适合我们的设计。任何问题?当然 :) 在生产环境中维护分布式事件存储与单个主机数据库完全不同。这将需要大量的 DevOps 能力和知识(或者在托管解决方案的情况下需要资金)。此外,请注意,从所有聚合中读取所有事件可能会带来一些挑战。我想在某种程度上,我们根本没有选择。如果我们想进一步扩展,
好消息是我们不必在实现中进行任何更改即可使用 Cassandra。和以前一样,我们只需要一些依赖项和配置:

akka {
  persistence.journal.plugin = "akka.persistence.cassandra.journal"
}
akka.persistence.cassandra.journal {
  keyspace-autocreate = true //not recommended for production
  tables-autocreate = true //not recommended for production
}


使用part_5源码中的 Cassandra 插件运行我们的应用程序,我们需要-Dconfig.resource=/application-cassandra.conf在 Intellij 启动配置中添加 VM 选项或直接从命令行运行它:

./mvnw spring-boot:run -Dspring-boot.run.jvmArguments="--enable-preview -Dconfig.resource=/application-cassandra.conf"


 

概括
我希望你能感受到 Akka Persistence 的这一无可置疑的优势,我们可以使用内存中的事件存储进行原型设计,然后在生产环境中切换到 JDBC 事件存储,一旦你遇到一些重大负载,就切换到像 Apache Cassandra 这样的分布式事件存储. 对我来说,这改变了游戏规则。我可以把非常困难的决定推迟到最后一刻。我还应该提到,我们可以为每个聚合类型配置事件存储。对于会产生数千个事件的长期聚合,例如交易所市场、物联网,我可以使用 Apache Cassandra,但对于具有少量事件的聚合,我可以使用 JDBC 插件。

在使用完全不同的数据库时,肯定会有一些特定的陷阱和陷阱。事件迁移将需要一些工作,这将是一个过程。关系事件存储上的模式与 Apache Cassandra(或任何其他分布式数据库)上的模式完全不同,但至少是可能的。我们将来不会被阻止,从一个数据库到另一个数据库的神话般的切换可能是真正的交易。这是可能的,因为使用事件溯源,我们不必太关心数据之间的关系。我们需要存储一个事件流并读取事件流,这可以在许多数据库引擎上进行模拟。使用 Actor 模型,我们可以使用(分布式)单写原则,并安全地使用不提供 ACID 事务保证的数据库。
如果您想将 Akka 用于 CRUD 功能,还有一个具有持久状态持久性的选项。
像往常一样 - 查看part_5源码并使用该应用程序。
 

猜你喜欢