关于EventSourcing事务的问题

关于EventSourcing事务的问题,还是有点不太明白。
在事件驱动编程中,数据持久化是异步,那如果持久化失败,我如何在业务流程中得知并处理,如果要等到数据持久化成功才进行下一步操作,不是又变成同步模型了么
@banq 老师,心中疑惑,百思不得其解
[该贴被tecentIDA8F53于2013-11-19 10:32修改过]

2013-11-19 10:20 "@tecentIDA8F53"的内容
那如果持久化失败,我如何在业务流程中得知并处理 ...

这是可靠性问题,持久化成功与否的结果不构成影响业务流程,业务事务不必依赖持久化来完成,持久化也不是构成业务事务的一个环节。

要做到持久化成功,可以采取重试方式,如果重试超过一定次数,呼唤人工介入。

EventStore实际是一种Domain 日志,我们平常看到的是网站访问日志,EventStore也是一种日志。用日志角度看EventSourcing就比较释然。

@banq 您的意思是,持久化只是可靠性的保证,业务的事务与持久化无关,可靠性的保证是:比如数据库宕机,在故障恢复后,之前未能持久化成功的数据能写回库内?
[该贴被tecentIDA8F53于2013-11-19 11:24修改过]
[该贴被tecentIDA8F53于2013-11-19 11:24修改过]

能具体点么,比如我们业务中的一个场景,
一个订单比如,4中状态,下单,处理中,结果(成功失败)
其中处理是依赖N个三方系统(使用长链接通信),第三方的系统并不能马上处理返回结果,也是需要等待。
常规的做法是
1.用户下单,写库(记录数据和状态),发送jms
2.接收到jms,从库中读取数据,开始处理,调用三方系统,等待处理完成,更新库中的状态,发送结果jms
3.接受结果jms,从库中读取数据,给客户发送结果
如果第一步持久化失败,后面的就无法继续了。

要根据DDD的思想改造,使用事件驱动,该如何改造呢,订单domain总不能一直存在内存中吧,如果持久化失败,服务器也宕机,那等于就算订单数据丢失了

[该贴被tecentIDA8F53于2013-11-19 11:42修改过]
[该贴被tecentIDA8F53于2013-11-19 11:43修改过]
[该贴被tecentIDA8F53于2013-11-19 11:43修改过]

2013-11-19 11:33 "@tecentIDA8F53"的内容
使用事件驱动,该如何改造呢,订单domain总不能一直存在内存中吧,如果持久化失败,服务器也宕机,那等于就算订单数据丢失了 ...

可能你思路还没有从传统请求思路切换到事件驱动上来。
建议看看:http://www.jdon.com/eda.html

注意到D-EDA,通过领域模型内部发出的领域事件来作为消息的来源。

领域事件发出的事件作为日志保存然后作为回放,称为Event Sourcing。这个回放过程实际类似JTA等同步事务的回滚一样。

JTA的同步事务都是在内存中有一个监视锁,如果同步事务工作期间,掉电怎么办?所以,我们假设比较的前提要一致。

掉电等可以使用备份冗余来,内存中有一个订单,通过复制策略复制到备份服务器上,掉电了可恢复,这些技术在分布式技术中相当成熟,已经被用在大数据实时分析中。
见分布式系统:
http://www.jdon.com/DistributedSystems.html

下面回答如果持久化失败怎么办?我之前已经回答了,持久化失败后自动重试,多试验几次,因为是日志追加,不存在写操作争夺的因素,这种成功概率要高于平时数据库读写操作。

只有日志追加成功,这个数据才会从消息队列中删除,否则一直留着,重试次数大于10了,呼叫人工干预即可。消息队列也是有备份冗余的。具体可见LMAX架构:http://www.jdon.com/42452

最后重申一下,在新的架构中,至少有两个逻辑层:业务领域模型和底层仓储,也就是持久化。低层次的问题不会反映到高层次上,因此,底层仓储出现问题,不会打扰领域模型之间的业务流程。持久化失败如同日志写失败,这个靠容错性等措施完全可以保证,不必去干扰正常的业务,打个比喻,法庭上有笔录,笔录人员不会说,你们辩论停停,我笔坏了,等我找支新笔或换台新计算机,你们再继续辩论。当场晕倒。

你举的这个案例,实际是将这两层合并在一个平面上。按照DDD和事件驱动应该是:
1.用户下单。(异步后台持久化写库)
2.返回客户 http结果 200,这是REST的标准响应。
3.用户再次发出REST的get请求,对刚才下单的订单查询。
4.返回内存中订单。(一般这样一个来回,第一步异步持久化的订单也正常完成了,可以在此步做个检查)。

非常感谢 @banq 老师的热心解答,的确思维转不过来
事件回放,这么一解释明白了许多,事件回放 有具体的例子么,我看了足球比赛等相关的例子,貌似都没有回放的部分。

而且事件回放和传统意义的事务回滚貌似不太一样,因为低层次出现问题不影响高层次,持久化出现异常也不应该影响业务,因此写库失败而导致的下单异常也是不会发生的,处理方法应该是故障恢复后重新写库。

@banq 老师你文章中提到的内存,应该都是指memcached之类的服务型缓存吧。

还有一个问题:我们现在消息系统是无序的,当消息量大的时候,消息的消费存在一定时间的延时,这个如果使用在事件驱动中消息的发送和处理有影响吗?
[该贴被tecentIDA8F53于2013-11-19 14:34修改过]

2013-11-19 14:29 "@tecentIDA8F53"的内容
内存,应该都是指memcached之类的服务型缓存吧 ...

呵呵,这里内存指In memory之类,和应用程序在一个VM中,不是memcached之类的服务型缓存。

in-memory是通过数据网格datagrid之类产品实现容错和冗余的,可见terracotta等产品。

消息系统建议使用Apache Kafka,ZeroMQ或Netty。
事件存储和回放很简单,就是几条SQL语句,看你使用什么数据库,这方面我建议和大数据分析联合起来,因为Domain日志比网站操作日志更有价值,作为大数据的输入数据来源。

EventStore有几个产品,参考:
http://www.jdon.com/tags/19815
[该贴被banq于2013-11-19 15:47修改过]

我觉得这里问题还有很多。比如:
1.既然事件是异步持久化的,也就是我们不依赖于事件的持久化结果就可以让domain做下面的事情了,那就是说数据一致性是完全由domain保证的;不知道banq是如何保证内存中的数据一致性的,特别是并发修改的时候的数据一致性;如果是redis这种分布式缓存,那一致性还好做一点,因为内存时共享的;而如果是本地jvm内存,那集群情况下,一致性就保证不了了;

2.使用in-memory+event sourcing的架构,一次聚合根的操作流程是:
1)controller发送command;
2)command handler从内存(redis)获取聚合根引用;
3)调用聚合根方法执行业务逻辑;
4)聚合根产生领域事件;
5)将更新后的聚合根保存到内存(这一步要做并发控制);
6)如果保存成功,则把事件放在Disruptor(是这样吗?);
7)另一个线程A从Disruptor不断读取事件,然后持久化到eventstore;
8)另一个线程B从Disruptor不断读取事件,然后发布事件给所有订阅者(是这样吗?);

假如现在第五步成功,也就是redis中数据更新了,但是此时机器断电了,则disruptor中的还未被消费的消息就丢了。这种情况如何处理呢?
[该贴被tangxuehua于2013-11-19 16:06修改过]

2013-11-19 16:04 "@tangxuehua"的内容
EventSourcing ...

@banq 说的应该是 利用terracotta这类产品来使得多台机器之间的内存得到共享

2013-11-19 17:03 "@tecentIDA8F53"的内容
利用terracotta这类产品来使得多台机器之间的内存得到共享 ...

是的,数据网格和分布式缓存Redis是有区别的:
http://www.jdon.com/artichect/scalable9.html

总体来说,数据网格是几台服务器内存中数据是一样的,内存复制,优点是适合聚合数据,缺点也是聚合数据的缺点,无法切割,伸缩性不够好,处理能力没有纯分布式扩展能力强,memcached是不同服务器上放不同的数据,有多少数据排多少台服务器。
[该贴被banq于2013-11-19 19:24修改过]

没有例子,光说一些道理。总没有说服力。

2013-11-19 16:04 "@tangxuehua"的内容
特别是并发修改的时候的数据一致性 ...

并发修改采取单写原则,见:
http://www.jdon.com/performance/singlewriter.html
参考:http://www.jdon.com/concurrency.html

Actor模型:
http://www.jdon.com/actors.html

了解以上背景知识后,流程应该是这样:


1). command进来需要修改聚合根某个方法
2) 这个聚合根不在系统内存中,需要从仓储中获取。放到缓存中,保持最经常使用的在内存中。
不能使用REDIS这样的需要通过Socket的服务型缓存,而是要和程序在一个JVM中的缓存,必须也是Java编写。如果是Redis外部缓存,处理方式和数据库处理一样的。
3). 执行方法,比如更改状态等,这时只有一个线程更改状态,是单写原则,类似Node.js


@[author]Model[/author]
public class AggregateRootA {

private int state = 100;

@[author]Inject[/author] //event Observable(Producer)
private DomainEventProduceIF domainEventProducer;

@[author]OnCommand[/author](
"CommandtoEventA"//command Observers(Consumer)
public Object save(ParameterVO parameterVO) {

//以单线程方式更新状态 (Single Writer)
this.state = parameterVO.getValue() + state;

//a reactive event will be send to other consumers in domainEventProducer
return domainEventProducer.sendtoAnotherAggragate(aggregateRootBId, this.state);

}

}


4).状态更改后产生领域事件。 domainEventProducer.sendtoAnotherAggragate
5).领域事件发到disruptor中
6)另外线程从disruptor从读取事件。
7)这个线程读取事件后按类名字母排名顺序执行相应的消费者代码
8)在消费者consumer中,如果是CQRS架构保存事件到Eventstore,如果是SOA架构就保存当前聚合根状态到数据库,执行其他业务逻辑。

以上是Jdon框架的执行顺序原理。
更详细参考:http://www.jdon.com/jdonframework/cqrs.html
[该贴被banq于2013-11-22 06:49修改过]

1. 聚合根保存在本机的jvm中,那集群时,聚合根不就会同时在多个jvm里存在了吗?那两台机器里的聚合根同时修改了,那如何处理这种并发修改呢?

2. 保存事件是由消费者来保存的?那如何保证消费者一定会保存事件呢?disruptor是基于内存的环形队列,虽然速度快,但不支持消息持久化,也就是断电后消息就丢了。

2013-11-22 16:52 "@tangxuehua"的内容
那两台机器里的聚合根同时修改了,那如何处理这种并发修改呢? ...

我们之前讨论过这个问题,同一个 聚合根因为是聚合数据,不能在两个以上服务器内存存在,只能存在一个服务器上,但是可以采取类似数据库碎片sharding方式,比如Order聚合根在A服务器,支付聚合根在B服务器上。所以,不存在同一个聚合根在两个机器被修改情况。

至于聚合数据内部,因为聚合这个结构关系,无法切分,所以,寄希望于Scale up/down,而不是scale out,也就是每台服务器使用多核来处理,每个聚合根修改使用单线程,一个核一个线程,无堵塞,核数越多,并行机会越大,性能越快,这是采取disruptor单写修改状态的原因。

关于Disruptor的容错问题,容错通常采取冗余备份,两台Disruptor服务器,其中一台做备份,LMAX里就是这么干的。

希望你能看到这个帖子,应该以后不会再有类似相同疑问了吧。


[该贴被banq于2013-11-22 17:31修改过]
[该贴被banq于2013-11-22 17:31修改过]

好吧,

我不知道你如何能做到一个聚合根只会在一台机器上。尤其是在我们对集群做动态扩容的时候。假如一个聚合根一开始只在A服务器上,后来由于增加服务器,所以,发现应该到B服务器去处理该聚合根,所以该聚合根会从B服务器上去找,但是此时该聚合根因为不在B服务器上的JVM里,所以你会同repository加载到B服务器的内存,然后在操作该聚合根。

但是因为此时A服务器上的input disruptor里可能还有消息会对A服务器上的聚合根做修改。

那不就出现我说的一个聚合根在两台服务器上被同时修改的情况了吗。

如果这点你能保证不会出现,那我就没问题了,呵呵。
[该贴被tangxuehua于2013-11-22 17:38修改过]