Scala 2014大会:建立一个Reactive应用案例


这是来自ScalaDays 2014大会上有关如何使用DDD EventSourcing/CQRS和Akka构建一个reactive的微服务应用系统的演讲,大意如下,英文PPT下载

这是以VPower系统为案例,在1.x阶段是一个整体Monolithic系统,使用的是Spring Hibernate和一些Scala实现,数据库是Postgres SQL.Server,这种铁板一块的系统带来可伸缩等很多问题,他们的目标是:
•Scala&&&Akka
• 模块化/分布式Modular/Distributed
• 松耦合Loosely&coupled
• 可扩展伸缩Scalable
• 失败冗余Fault&tolerant
• 快速响应Responsive
•不可变与领域模型 Immutable&domain&model
• 更少的数据表结构Schema@less&data&model

因此,他们选择了:
•DDD领域驱动设计Domain Driven Design
• CQRS
•最终一致性 Eventual-Consistency
• 事件溯源Event-Sourcing
• 无数据结构Schema-less
• 微服务Micro-service based
• RESTFul服务 Headless via Rest

具体实现工具是:
• Scala
• Akka
• Eventsourced/Akka3Persistence
• Spray.io
• Mongo
• Angular.js
• D3.js

这里省去DDD CQRS EventSourcing介绍。

在CQRS的Command中,我们关心的是行为而不是以数据为中心,这就导致DDD一个更加真实的实现。Command是系统的一个请求,用于执行一个任务或动作,比如:
• RegisterClient
• ChangeClientLocale

Command是imperative的。它们是针对可变状态的请求,它们代表客户端想采取的动作,以消息而非DTO方式传递。

•可以看成一系列方法调用
• command-handler-can-say-NO
• 内部状态不能暴露
• 仓储层变得简单。

CQRS命令handler是专门处理命令:
• 客户端以消息形式发送命令。
• 消息被command handler处理
• 命令是可以拒绝的
• 转换为一个或多个事件,这些事件能被持久化。

CommandHandler代码:


class ExampleProcessor extends PersistentActor {
var state = ExampleState() # <--- mutable state, but NOT shared = OK!
def updateState(event: Evt): Unit =
state = state.update(event)
...

val receiveRecover: Receive = { # <=== process persisted events on boostrap
case evt: Evt => updateState(evt)
case SnapshotOffer(_, snapshot: ExampleState) => state = snapshot
}

...
val receiveCommand: Receive = { # <=== process commands, if valid persist events
case Cmd(data) =>
persist(Evt(s"{data}")) { event =>
updateState(event)
context.system.eventStream.publish(event)
}
...
}

}

上面代码中receiveCommand方法是以消息形式接受客户端的一个命令,然后更改内部状态。(banq注:通常状态是由聚合根守卫,更改状态需要通过聚合根实体,这里将CommandHandler和聚合根混为一体,或者可能是将聚合根的修改状态职责迁移到commandHandler,这样做的原因估计是Akka的Actor因为要继承PersistentActor 这样非业务父类,失去直接作为实体纯洁性,因为聚合根实体是纯业务的,不能和任何技术框架绑定。)

接下来谈传统的CRUD增删改查的问题。
•DTO模型会远离领域,最后导致代码偏离需求
•getter方法会暴露内部状态,导致状态修改不一致
• DTO不同于领域模型的模型。
• 通常需要额外的映射转换。
•通常在仓储方法中有大量读方法,
• 实现针对查询优化比较难,因为读写捆绑在一起
• 其实查询的对象并不等同于数据模型。
• 对象模型需要被转换为数据模型。
• 对象和数据两种模型不匹配阻抗难以避免

使用CQRS的读写分离,Command负责写,而Query负责查询:
• DTO负责从存储中读。
• 无需复杂的ORM
• 属于根据屏幕直接抓取数据结构。
• 易于针对读优化
..

Eventual Consistency最终一致性
• 业务自己决定在读和写多长时间进行同步(NoSQL之类数据库也是采取读写同步,但是必须脱离业务在数据库产品中配置)
• 从写这边push推一个异步消息
• 读这边有监听
• 两段事务2PC不再需要
• 使用事件存储event store作为查询

下面是最终一致的读代码:


class BenefitsView extends View {
import EmployeeProtocol._
import BenefitsProtocol._
...
override def processorId = "employee-processor" # <=== identifies the processor
override def viewId =
"benefits-view"
def receive = {
case p @ Persistent(payload, _) =>
payload match {
case evt: EmployeeHired =>
val eb = BenefitDates(evt.id, evt.startDate, Nil, Nil, Nil)
...
}
}
}


[该贴被banq于2014-07-05 13:18修改过]

使用ES建立一个Reactive 购物车是该演讲的第二大部分。

在大部分系统中是依据状态来处理事务的,比如关系数据库等,这种方式有其副作用,高性能关键系统不适合。

而我们截获事件,是一种很自然的行为方式,可以跟踪业务交易行为本身,并不需要记录当前状态,状态由事件即时计算产生。

以购物车为案例传统CRUD方式是:
1. Cart被创建
2. Item 1 @ $30 added
3. Item 2 @ $15 added
4. Item 3 @ $12 added
5. Item 4 @ $5 added
6. 购物车信息added
7. 总数计算出@ $62
8. 订单 123 插入表中

如果顾客在订单装箱运送之前改变主意了,CRUD如下操作:
1. Order 123 从表中fetched
2. Item 2 @ $15 removed
3. Total @ $47 regenerated
4. Order 123 updated

这时订单123的当前状态被保存:


这笔业务的当前状态是有三个条目,总数是$47元。

如果现在经理要求团队给出一个有关顾客从订单中删除条目的报告,这时就无法完成,因为数据库中只有订单当前状态。

传统关系数据库和静态数据模型的问题:
1.只能查询和持久当前状态。
2.领域模型和数据模型紧耦合在一起
3.导致抽象泄漏。
4.导致贫血模型。
5.丢失用户操作意图(如上面用户删除条目)


使用事件来跟踪用户行为,每个交易动作都会被记录,当前状态通过播放事件也就是交易行为获得。

事件是代表已经发生的,但是与Command区别是,事件是不能被拒绝的。如
•ClientRegistered
• ClientLocaleChanged

下面看看如何使用ES实现购物车,购物车的操作有:
1. Cart created
2. Item1'@'$30'added
3. Item2'@'$15'added
4. Item3'@'$12'added
5. Item4'@'$5'added
6. Shipping information added
7. Order '123' event stream inserted

如下图:


如果这时顾客改变主意,更新了购物车:
1. Order 123 event stream fetched
2. Item 2 @'$15 removed event
3. Order'123' event stream appended追加

我们只要把顾客操作的行为作为事件原原本本地记录追加到原来的事件系统中即可。

通过重新播放这个对象的事件流,对象状态能达到最新当前状态。


这样的好处是:没有对象的结构化表示,只要重播之前的交易行为即可,数据不是被结构化地保存到事先设定结构的数据库中,状态结构是可变的。事件是不可变的,代表一系列交易行为,领域模型的状态和数据库存储的状态不会发生耦合。

相比CRUD好处是:
没有updates或delete操作,每件事都作为事件存储,只以append追加方式,删除和更新被可以追加的事件替代。

技术上的影响:
存储系统成为架构上的附加了,而不是主要核心,只有追加append的架构容易分布,因为没有修改删除,就更少机会需要处理锁了。比传统数据库水平分区更易于扩展。

业务上的优势:
关键业务操作都被事件流记录,你能回答系统从开始的任何操作问题,事件流相当于操作日志,最直接自然的审计日志。



[该贴被banq于2014-07-05 13:55修改过]

marked...
CQRS/ES概念非常好,但是在实际系统上操作起来还是有一定的难度,虽然简单,但却不容易啊。

采用event sourcing的系统,在修改聚合根之前的前置业务规则检查时,有时会很难处理。比如我现在有一个版本聚合根,版本有版本号和状态,然后状态总共有4种,然后我们希望版本的其中三个状态的版本号要唯一;

此时,如果使用cqrs+event sourcing的架构,该如何实现这个场景呢?

2014-07-08 10:15 "@tangxuehua"的内容
本的其中三个状态的 ...

这样的情况可能需要引入一个Versions作为聚合根,Version作为其内部维护的实体才可。

2014-07-10 14:37 "@JasonMing"的内容
这样的情况可能需要引入一个Versions作为聚合根,Version作为其内部维护的实体才可。 ...

当版本很多时,这样的做法会死人的。性能非常低下

感觉不太会吧。
N个版本,4种状态,其中三个是互斥的,应该就是类似
Release: v10
RC: v9
Beta:v8
Other: v1~v7
这样的吧
前三个当作内部的状态,有新版本要取代其中一个,去校验这三个特殊版本。
真心看不出慢在哪里了,遍历都没有,就是一个常量级别的比较啊。

你没有理解我说的意思。

我的意思是,版本是一个聚合,每个版本都有自己的状态。然后我说的不重复是指当你要新增一个版本聚合的时候,要保证它的状态不能重复,判断是否重复时要判断的版本聚合根的条件就是我一开始说的可能有三种状态的版本要参与比较;

这种情况下,如果版本聚合根有100W个,那你也要把这100W个聚合根的版本信息都放在Versions聚合根里吗?这样做会导致性能低下的。
[该贴被tangxuehua于2014-07-11 15:02修改过]