使用Disruptor实现并发编程 PPT文档

12-03-09 banq
2012年Qcon伦敦大会3月7日到9日在伦敦召开,所谓实践出真知,Qcon英文大会可谓是世界上战斗在实践探索第一线的顶尖高手分享大会,也是一次软件创新大会。

这次大会除了云计算架构之外,有三个具体领域:移动 Scala和Java平台,而在Java平台中,比较令人关心的是LMAX所做的Disruptor报告,DDD推动者gregyoung在其Twitter微博上说,很想去听LMAX报告,可惜挤不进去了,由此可见LMAX架构的热度。

LMAX的报告主题是:

Concurrent Programming Using The Disruptor使用Disruptor实现并发编程,主要讲述了如何使用Disruptor 2.8如何进行并发编程,Disruptor核心是其神奇的RingBuffer,见下面附图:

使用Disruptor有几个步骤,以一个生产者,一个消费者为例子:
1. 首先需要一个创建事件的工厂,实际是一种事件生产者:

private static class SimpleEventFactory implements EventFactory<SimpleEvent> { 
        public SimpleEvent newInstance() { 
            return new SimpleEvent(); 
        } 
}
<p class="indent">

其中SimpleEvent是你自己的POJO事件对象。

2.创建事件处理器,消费者获得事件后,需要激活事件处理器EventHandle:

public class SimpleEventHandler implements EventHandler<SimpleEvent>{ 
    private List<String> valuesSeen = new ArrayList<String>(); 
    @[author]Override[/author]  //这是事件处理器主要的方法,将被自动在另外一个线程激活执行
    public void onEvent(final SimpleEvent event, 
                        final long sequence, 
final boolean endOfBatch) throws Exception { 
        valuesSeen.add(event.getValue()); 
    } 
<p class="indent">


3.然后将事件,事件处理器和RingBuffer装载在一起:

//创建一个RingBuffer,事件处理器等待策略是Yield方式,比较吃CPU
final RingBuffer<SimpleEvent> ringBuffer = new RingBuffer<SimpleEvent>(SimpleEvent.EVENT_FACTORY, 
                new SingleThreadedClaimStrategy(RING_BUFFER_SIZE), new YieldingWaitStrategy()); 

final SimpleEventHandler eventHandler = new SimpleEventHandler(); 
final BatchEventProcessor<SimpleEvent> eventProcessor = 
    new BatchEventProcessor<SimpleEvent>(ringBuffer,
                                         ringBuffer.newBarrier(), 
                                         eventHandler);
eventHandler.setSequence(eventProcessor.getSequence()); 
ringBuffer.setGatingSequences(eventProcessor.getSequence());
<p class="indent">

以上装载wire到一起后,只要调用ringBuffer.publish()就在当前线程开启发送事件,SimpleEventHandler.onEvent在另外一个线程被激活运行。

这之间原理如图JDK的队列等概念,好像是线程通讯的基础用法,但是RingBuffer相比JDK提供的那些有锁Queue或LinkedList,其特点是无锁,可见本站以前关于Disruptor的帖子

该PPT中还列举了一个生产者,多个消费者的使用方式。

[该贴被banq于2012-03-10 17:43修改过]


27
banq
2012-03-10 17:44
同时,本站基于最新Disruptor 2.8的Jdonframework 6.5.1版本也发布了:将原来的Disruptor 2.0升级为最新的2.8版本,每个消息topic对应一个固定的RingBuffer,重用性高,能够发挥RingBuffer为CPU高速缓存优化的特点。

Disruptor 2.8支持四种WaitStrategy策略:BLOCKING是不耗CPU,但是吞吐量不大,如果你的CPU负载高了,那么切换到SLEEPING,这是CPU耗费和吞吐量之间的平衡,空闲时CPU有一定负载,Linux下比Windows平台显著;YIELDING 和 BUSY_SPIN通过大量消耗CPU,获得最大吞吐量和最低1微妙延迟性。

Jdon Framework 6.5.1缺省采取BLOCKING。

基于Jdon Framework 6.5.1的JiveJdon 4.6.1也发布,JiveJdon 4.6已经在jdon.com稳定运行三个月,7x24持续运行,没有死锁,没有内存泄漏。

下图采取Disruptor Sleep策略后的性能测试结果,虽然比较吃CPU,但是吞吐量和响应速度都比较理想:



开源Jdonframework和jivejdon下载地址:http://www.jdon.com/jdonframework/download.html

[该贴被banq于2012-03-10 19:58修改过]

banq
2012-03-10 20:07
有人可能疑问Disruptor是否有重复发明轮子的嫌疑?

这点Martin Fowler已经在其LMAX架构中有关“队列和机制偏爱的缺乏”谈过了,我个人观点是:JDK中NoBlockingLinkedList算法等虽然是经过大师Doug Lea等多年研究推出,但是算法和技术实现是两码事,LMAX团队就是认为NoBlockingLinkedList这些标准算法的API其实在实现机制上有很大缺陷,比如它们要在队列的生产者和消费者之间共享修改一些队列长度等信息,这些又造成了新锁,也就是说,虽然算法理论本身是为了减少锁或回避锁,但是为实现这个算法反而引入更多的锁。

LMAX团队还发现所谓的一些Actor模型无法满足自己应用场景的强力测试,后来发生了改进Scala的Actor模型的Why AsyncFP 引起的一场争论

不要重复发明轮子是对的,这个真知模型是有其场景边界的,如果你的应用对并发性能有无限饥渴,那么这个真知模型就不适用了。

[该贴被banq于2012-03-10 20:16修改过]

minitan
2012-03-12 15:29
有点不太明白Disruptor的使用场景,还有事件的执行状态如何获取,比如提交一个订单,我如何知道订单提交成功了呢?
又如我在论坛发布一个帖子,Disruptor不返回帖子发布的状态,而只能通过查询来确认是否发布成功?那如果在消费端发生异常了呢,是否异常也无法马上获取,只能依靠查看日志什么的?

[该贴被minitan于2012-03-12 15:40修改过]

banq
2012-03-12 15:41
2012年03月12日 15:29 "@minitan"的内容
Disruptor的使用场景 ...


作为并发非堵塞事件的底层机制,非堵塞事件应用很多,比如可以用在Socket Servce,Node.js的Socket使用的事件轮询也属于这种类型,按这里

可以用在业务逻辑处理上,大量数据需要撮合等等,比如订单处理上。

提交一个订单就产生一个订单事件,然后就可以立刻返回成功,等订单事件的消费者处理器真正处理订单了,那是还可以返回成功,当然,也不必这么来回两次,可以象Node.js那样Hold住TCP/IP那样处理。

minitan
2012-03-12 16:05
2012年03月12日 15:41 "@banq"的内容
提交一个订单就产生一个订单事件,然后就可以立刻返回成功,等订单事件的消费者处理器真正处理订单了,那是还可以返回成功,当然,也不必这么来回两次,可以象Node.js那样Hold住TCP/IP那样处理。 ...


比如12306一个订票流程,如果用Disruptor来实现,大致实现过程是怎样?

banq
2012-03-13 08:32
2012年03月12日 16:05 "@minitan"的内容
12306一个订票流程,如果用Disruptor来实现 ...


说起来很费事,如果把订单 票等同于我们现在发的一个帖子,简单情况下可以参考JiveJdon源码,主要是见MessageKernel.java,创建和删除委托给一个threadManagerContext(DCI场景,其中包含Disruptor的事件生产者)。

考虑到12306的并发量极其大,可以升级到LMAX架构,这是一种Event Sourcing架构,专门票据处理器处理票据,和外界通讯都通过Disruptor.可以在一个线程里每秒处理6百万订单。

[该贴被banq于2012-03-13 08:35修改过]

minitan
2012-03-13 11:21
2012年03月13日 08:32 "@banq"的内容
说起来很费事,如果把订单 票等同于我们现在发的一个帖子,简单情况下可以参考JiveJdon源码,主要是见MessageKernel.java,创建和删除委托给一个threadManagerContext(DCI场景,其中包含Disrupto ...


这里有一个问题,就是应用Disruptor不太理解的地方。创建或者删除事件委托给事件生产者以后,就立刻返回结果了,而不用去关心事件是否处理成功或者其最后的执行状态是什么,而只能通过事后的查询类确定事件执行结果。(读写分离?)
传统删除的流程一般是,发起删除操作,业务处理删除,处理完成后返回执行状态,再根据这个状态告诉前端是否删除成功。使用Disruptor后,发起删除事件以后,就直接返回,后面的事情委托给Disruptor来处理,前端是没法获取事件的执行状态的。这个是觉得别扭的地方。
我的问题是,在使用Disruptor异步处理事件之后,事件的执行状态是否需要立即返回(初衷?),如果需要,怎么获取到事件状态,是否有代码实例?

banq
2012-03-14 09:52
2012年03月13日 11:21 "@minitan"的内容
事件的执行状态是否需要立即返回 ...


再用一个RingBuffer就可以返回了,可见JdonFramework源码中EventResultDisruptor等。

使用Disruptor这样非堵塞队列替代JDK中的LinkedList(无论Blocking或NoBlocking),可以起到单机性能的提高,可以在单机吞吐量和延迟之间做一个平衡,无延迟吞吐量不会变大,1微秒延迟可以实现100K TPS的吞吐量,这也是异步架构带来的巨大处理量。关于这个问题,可见这个案例说明

[该贴被banq于2012-03-14 10:09修改过]

minitan
2012-03-14 16:39
2012年03月14日 09:52 "@banq"的内容
再用一个RingBuffer就可以返回了,可见Jdonframework源码中EventResultDisruptor等。
...


终于搞明白了。

withmemores
2012-03-19 14:54
有HELLO WORD 搬的例子么!?>

superallen
2012-04-16 17:36
eventHandler.setSequence(eventProcessor.getSequence());

在 eventHandler里并没有这个方法,这个方法的作用是什么?

flowerknight
2013-01-24 17:59
因项目的原因近一段时间在研究Disruptor,发现几个现象,向LZ求证一下:

我们的测试场景:
1)多线程做为消息事件的生产者,均以单条消息事件的方式发给调度程序;10个线程,每个线程发1000个事件消息;
2)调度程序采用一个一级Buffer用来缓存生产者发送来的消息事件(在放入一级缓存前sleep 2个毫秒,模拟业务处理的耗时);其中这个一级BUFFER分别采用LinkedList、BlockingQueue、Disruptor RingBuffer三种容器进行测试;
3)调度程序每次从一级Buffer获取100条消息事件,放入一个由BlockingQueue构造的二级Buffer,相当于这个Buffer中的每个slot保存的都是一个消息事件集合(Collection);
4)采用ThreadPoolExecutor的方式分配5个子线程,用于处理二级Buffer中的消息事件集合。

在测试Disruptor时,发现:
1)SingleThreadedClaimStrategy策略模式比MultiThreadedLowContentionClaimStrategy策略模式的速度要快。前者用时2.2秒左右,后者用时3.2秒(等待模式均使用YieldingWaitStrategy)。不过,SingleThreadedClaimStrategy模式有丢单的问题,最离谱的情况下,最终只能收到8000个左右的事件;仅修改到MultiThreadedLowContentionClaimStrategy,就可以正常收到全部10000个事件消息。测试过程中RingBuffer Size设置为16384。目前不明。

2)MultiThreadedLowContentionClaimStrategy下测试等待模式对性能影响:
YieldingWaitStrategy:3秒左右
BlockingWaitStrategy:2.2秒左右
SleepingWaitStrategy:2.4秒左右

2)无压力(系统空闲)场景,对比等待模式对CPU的占用情况:(线程策略为MultiThreadedLowContentionClaimStrategy)
YieldingWaitStrategy:CPU资源平均在25%左右;
BlockingWaitStrategy:CPU资源平均在3%左右;
SleepingWaitStrategy:CPU资源平均在8%左右;

banq
2013-01-25 08:38
2013-01-24 17:59 "@flowerknight"的内容
在测试Disruptor时,发现 ...


三个实验结果基本属实,使用Disruptor主要是im-memory两个线程间的数据交换,属于线程并发,如果是一对多的分发,建议用disruptor+RabbitMQ这些消息中间件,对于大量集群分布式式分发比较在行,Disruptor相对来说比较微型,单机式的。

flowerknight
2013-01-25 12:43
针对SingleThreadedClaimStrategy与MultiThreadedLowContentionClaimStrategy进行了复测,目前看:

SingleThreadedClaimStrategy针对单线程做生产者的场景;多线程场景下存在丢单的问题(原理上可能是同一个slot被多线程覆盖了);测试采用1个线程,没有丢单的问题。

MultiThreadedLowContentionClaimStrategy针对多线程做生产者的场景;根据Ringbuffer的设计思想,一个线程在获得pos之前,其他并发线程要等待(或要重新计算的熬下一个可用slot的位置),所以保证了唯一性,没有丢单问题。
但是使用MultiThreadedLowContentionClaimStrategy模拟“多生产者,1个消费者”的场景,性能下降的有些离谱。
10个线程,每个线程发1000个事件,采用MultiThreadedLowContentionClaimStrategy + BlockingWaitStrategy的模式,完成这10000次的事件接收处理,耗时8秒左右。
目前我们已经完成了一轮详细的对比测试,感觉Disruptor的RingBuffer可能不太适合OLTP级别的应用,对于OLAP级别的应用应该还是非常有效果的。不过Disruptor有很多好的想法,值得深入研究。

猜你喜欢
2Go 1 2 下一页