Event Sourcing在分布式系统中应用

本文来自原文:Event Sourcing at Global Scale,谈论了如何在应用程序中通过开发人员自己实现基于ES的CQRS分布式系统,是一种牺牲一致性(最终一致性)换得低延迟高可用的分布式方案。实际是Vector Clock算法应用,相当于自己实现类型Dynamo,Project Voldemort等NoSQL机制。

因为国际客户的存在,我们开始探索如何基于Event Sourcing分布化应用,主要驱动原因是地理位置距离的原因(称为sites站点),从不同地理位置访问应用应该有低延迟,每个站点site应该在最近的数据中心运行应用,应用数据应当跨所有sites复制,每个站点如果存在内部网络分区情况下应保持写入的可用性,当分区修复后,从不同站点的更新应该能融合merge,冲突应该能自己解决。

Eventuate是这个项目的原型,继续不断完善推向生产环境。

这个案例中有6个站点,(A-F),以不同进程运行在本地localhost,通过改变配置,站点也可以分布到多个主机上,站点A-F可通过双向连接 异步的事件复制连接:


A E
\ /
C -- D
/ \
B F

每个站点都必须配置:
1. 一个Akka Remoting 的hostname和port (akka.remote.netty.tcp.hostname 和akka.remote.netty.tcp.port)
2. 本地事件日志复制的id(log.id 是站点的id)
3.到其他站点的复制连接列表(log.connections)

对于这个案例,site C配置如下:
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port=2554

log.id = "C"
log.connections = ["127.0.0.1:2552", "127.0.0.1:2553", "127.0.0.1:2555"]

开始这个原型时,我们使用了akka-persistence,但是不久发现 akka-persistence (2.3.8)并不适合我们的概念和技术要求,于是决定对 akka-persistence API 进行一些修改结合我们的geo-replication。这些拓展不仅能适用geo-replication场合,而且可克服akka-persistence一些当前限制,比如event-sourced actor必须是集群范围内的单例,而我们的目标是,允许几个actor实例在多个节点并发更新,如果冲突发生也能解决,这样我们就支持来自几个事件生产者的事件聚合与汇聚,这样能确定地对所有这些事件进行重放。

在我们上面原型中 geo-replicated event-source应用是跨站点分布,每个站点在一个独立的数据中心,为了低延迟访问,用户选择与其地理位置最近的一个站点交互访问。

事件日志
系统核心是全局可复制的事件日志,它拥有事件的happens-before关系,它是使用向量vector时间戳来跟踪的,这些时间戳是由一个或更多向量时钟vector clock在每个站点产生,并与事件一起保存到事件日志中,通过比较向量时间戳,一个站点可决定任何两个事件的happens-before前后关系或同时发生。见这个案例源码

事件的部分顺序是由向量时间戳决定的,保存在每个本地日志,如果e1 -> e2,那么offset(e1) < offset(e2),这里->表示happens-before关系,而offset(e)事件e的在事件日志中的位置或索引,举例,如果一个站点A写入事件e1引起站点B的事件e2,那么复制协议确保e1总是在所有站点本地日志存储中先于e2存储。

存储顺序很重要,事件的生产和消费者才能从事件日志获得可靠复制,引起相应顺序事件广播,如果emit(e1) -> emit(e2),那么在所有站点所有应用将消费e1先于e2,这里->是happens-before 关系,emit(e)代表事件的写入发生,保存因果顺序非常重要。

状态复制
通过复制日志记录,应用状态能够在不同站点重新建立,在站点网络分区中,站点必须保留状态更新和复制的可用性,因此,如果发现冲突,必须解决,更准确说,在内部站点网络分区中,冲突的更新不会发生。

举例,站点A对领域对象x1做了更新,那么更新事件e1被写入事件日志,一段时间后,站点A接受到来自站点B的更新事件e2,针对同样领域对象x1,如果站点B在发射e2事件之前已经处理了e1,那么e2因果关系是依赖e1,那么站点A只是简单将e2更新应用到x1,在这种情况下,两个更新e1和e1都已经被应用到两个站点的x1上,而x1的拷贝也会变成同样值,另外一个方面,如果站点B与站点A同时更新x1,那么冲突就发生。

同时发生事件是否冲突完全取决于应用逻辑,例如针对不同领域对象同时修改对于一个应用也许是可接受的,而针对同一个领域对象的同时修改会被看成是冲突,必须解决,任何两个事件是同时或 happens-before前后顺序关系都能通过比较它们的向量时钟获得。

冲突解决
如果应用状态是使用 commutative replicated data types (CmRDTs) 建模的,状态更新操作是通过事件复制的,并发修改一点不是问题,在这种情况下,事件复制甚至不需要保留因果顺序,我们应用中许多状态更新操作不是满足交互律commutative 的,我们需要支持冲突解决的交互式和自动化两个方式。

应用状态冲突版本在一个同时并发版本树中被跟踪,这是一个根据事件的向量时钟构建的数据结构,对于任何类型s的状态值和任何类型A的更新,并发版本s能以数据类型
ConcurrentVersions[S, A]方式被跟踪,并发版本能针对不同领域对象甚至领域对象字段独立跟踪,这依赖于应用需要发现和解决冲突的粒度。

在交互式冲突解决中,一个用户选择一个冲突版本作为"winner",这种选择是作为一个明确的冲突解决事件存储到事件日志中,这样以后用户交互介入时需要事件重放,这样可能会进行人工干预下的冲突事件融合merge,这种情况下,冲突解决事件必须包含融合细节以便融合本身也可重新再来一次。

在自动化冲突解决下,为了选择一个"winner",应用一个定制的冲突解决函数到冲突版本(vector clock算法?),冲突解决函数会自动融合冲突版本,然后我们就进行了convergent replicated data types (CvRDTs).

注:按照CAP定理,CRDT是保证了可用性A和分区性P(AP),实现最终一致性;而Cassandro代表的Paxos是保证强一致性C和分区性P(CP)。