disruptor - 并发编程框架

disruptor - Concurrent Programming Framework 并发编程框架

disruptor发布了Java的2.0版本(.Net版本见这里),disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式实现,或者事件-监听模式的实现,直接称disruptor模式。

disruptor最大特点是高性能,其LMAX架构可以获得每秒6百万订单,用1微秒的延迟获得吞吐量为100K+。

disruptor与传统高性能模型是不同的,LMAX团队通过测试发现热门的Actor模型在高并发设计有瓶颈,disruptor的RingBuffer根据多核CPU的高速缓存设计特点进行了优化,让每个CPU运行一个线程,多个CPU就是多线程并发模式了,正如团队所言:我们想出一个更好,更快的线程之间共享数据的方式,不与世界分享将是自私的,不共享知识让我们看上去是死聪明。

传统消息框架使用Queue队列,如JDK LinkedList等数据结构实现,RingBuffer比Linked之类数据结构要快,因为没有锁,是CPU友好型的。另外一个不同的地方是不会在清除RingBuffer中数据,只会覆盖,这样降低了垃圾回收机制启动频率。

使用案例代码:


DisruptorWizard<MyEvent> dw = new DisruptorWizard<MyEvent>(MyEvent.FACTORY, 32, Executors.newCachedThreadPool());
EventHandler<MyEvent> handler1 = new EventHandler<MyEvent>() {
public void onEvent(MyEvent event, boolean endOfBatch) throws Exception {
System.out.println("MyEvent=" + event.r);
}

};
EventHandler<MyEvent> handler2 = new EventHandler<MyEvent>() {
public void onEvent(MyEvent event, boolean endOfBatch) throws Exception {
System.out.println(
"MyEvent=" + event.getResult());
}

};
dw.handleEventsWith(handler1);
dw.after(handler1).handleEventsWith(handler2);

RingBuffer ringBuffer = dw.start();


MyEvent event = (MyEvent) ringBuffer.nextEvent();
event.setValue(60);
ringBuffer.publish(event);

或者:

SampleExecutor executor = new SampleExecutor();
RingBuffer<MyEvent> ringBuffer = new RingBuffer<MyEvent>(MyEvent.FACTORY, 4, ClaimStrategy.Option.SINGLE_THREADED,
WaitStrategy.Option.YIELDING);
MyBatchHandler batchHandler = new MyBatchHandler();

DependencyBarrier dependencyBarrier = ringBuffer.newDependencyBarrier();
BatchEventProcessor<MyEvent> batchProcessorFizz = new BatchEventProcessor<MyEvent>(ringBuffer, dependencyBarrier, batchHandler);
executor.execute(batchProcessorFizz);

MyEvent event = ringBuffer.nextEvent();
event.setValue(60);
ringBuffer.publish(event);


相关主题:
JVM伪共享
Disruptor系列文档
Martin Fowler的LMAX架构
Disruptor为什么这么快?锁是坏的

[该贴被banq于2011-09-07 18:58修改过]

Disruptor没有像JDK的LinkedBlockQueue等那样使用锁,针对CPU高速缓存进行了优化。

原来我们以为多个线程同时写一个类的字段会发生争夺,这是多线程基本原理,所以使用了锁机制,保证这个共用字段(资源)能够某个时刻只能一个线程写,但是这样做的坏处是:有可能发生死锁。

比如1号线程先后访问共享资源A和B;而2号线程先后访问共享资源B和A,因为在资源A和资源B都有锁,那么1号在访问资源A时,资源A上锁了,准备访问资源B,但是无法访问,因为与此同时;而2号线程在访问资源B,资源B锁着呢,正准备访问资源A,发现资源A被1号线程锁着呢,结果彼此无限等待彼此下去,死锁类似逻辑上自指悖论。

所以,锁是坏的,破坏性能,锁是并发计算的大敌。

我们回到队列上,一把一个队列有至少两个线程:生产者和消费者,这就具备了资源争夺的前提,这两个线程一般彼此守在队列的进出两端,表面上好像没有访问共享资源,实际上队列存在两个共享资源:队列大小或指针.

除了共享资源写操作上存在资源争夺问题外,Disruptor的LMAX团队发现Java或C在多核CPU情况下有伪共享问题
CPU会把数据从内存加载到高速缓存中 ,这样可以获得更好的性能,高速缓存默认大小是64 Byte为一个区域,CPU机制限制只能一个CPU的一个线程访问(写)这个高速缓存区。

CPU在将主内存中数据加载到高速缓冲时,如果发现被加载的数据不足64字节,那么就会加载多个数据,以填满自己的64字节,悲催就发生了,恰恰otspot JVM中对象指针等大小都不会超过64字节,这样一个高速缓冲中可能加载了两个对象指针,一个CPU一个高速缓冲,双核就是两个CPU各自一个高速缓冲,那么两个高速缓冲中各有两个对象指针,都是指向相同的两个对象。

因为一个CPU只能访问(写)自己高速缓存区中数据,相当于给这个数据加锁,那么另外一个CPU同时访问自己高速缓冲中同样数据时将会被锁定不能访问。

这就发生与锁机制类似的性能陷进,Disruptor的解决办法是填满高速缓冲的64字节,不是对象指针等数据不够64字节吗?那么加一些字节填满64字节,这样CPU将数据加载到高速缓冲时,就只能加载一个了,刚刚好啊。

所以,尽管两个线程是在写两个不同的字段值,也会因为双核CPU底层机制发生伪装的共享,并没有真正共享,其实还是排他性的独享。

现在我们大概知道RingBuffer是个什么东东了:
1.ring buffer是一个大的数组.
2.RingBuffer里所有指针都是Java longs (64字节) 不断永远向前计数,如后面图,不断在圆环中循环。
3.RingBuffer只有当前序列号,没有终点序列号,其中数据不会被取出后消除,这样以便实现从过去某个序列号到当前序列号的重放,这样当消费者说没有接受到生产者发送的消息,生产者还可以再次发送,这点是一种原子性的“事务”机制。

鉴于Disruptor如此革命性的优点,JdonFramework 6.4新版 采取Disruptor作为其Domain Events实现机制。 即可以方便简单享用Disruptor的新特点;又能根据自己的要求继续深化使用Disruptor,最极致的情况如运行的LMAX系统一样,每秒处理6百万个订单。

[该贴被admin于2011-09-08 16:06修改过]
[该贴被admin于2011-12-22 10:56修改过]


如题

那这不就跟cpu结构耦合了么。。。。?
如果是在单核CPU上,是不是就跟普通的并发方式性能差不多了?

600万订单,确实很快,神话级的框架

二楼图片无效,无法看到

还是不明白它是怎么不用锁的.这里也只说了伪共享的问题.

2011年12月28日 17:01 "@yaoqis"的内容
还是不明白它是怎么不用锁的 ...

普通linkedlist需要锁是因为要知道producer端是否放入,producer会对这个全局变量不断修改,而修改会需要锁,Disruptor则是Ringbuffer这个循环不断增加的环形,来回循环会覆盖以前的。

看了Lmax和disruptor的相关文章有些东西没弄懂。
1.Lmax的架构是有很多web服务器接收外部请求以及一台逻辑处理器处理商用逻辑?
2.在逻辑处理服务器上有input Disruptor和output Disruptor。
input Disruptor的数据是从哪里来的?如果是从web服务器过来的,就是说这台服务器需要通过诸如http连接等方式从web服务器获取数据,那input Disruptor的生产者就必须是多线程的?逻辑处理完毕后结果输出到output Disruptor后又是怎么给到客户端的?
3.如果我的系统使用disruptor,那么哪些会阻塞的操作应该怎么办?
放到生产者哪里还是消费者哪里?

2012年02月01日 14:33 "@chenchuanfeng001"的内容
input Disruptor的数据是从哪里来的?如果是从web服务器过来的,那input Disruptor的生产者就必须是多线程的?逻辑处理完毕后结果输出到outpu ...

Disruptor的生产者来自前台Web服务器,更前端是客户操作,客户任何操作通过http化为事件,事件的含义就是非堵塞的,可以用多线程实现;

逻辑处理完毕后输出output,注意有时逻辑处理需要其他一些协调处理,这时也是通过output触发其他子系统,其他子系统处理完毕后,放入input中,再继续接着处理直至完毕。

使用Disruptor就是为了防止堵塞,如果有堵塞延迟,应该发生在Disruptor这里,其他地方是通畅的。

还是有些不明白。
input Distuptor的生产者是单线程还是多线程?
如果数据是从web服务器过来的,那么该生产者就应该是多线程的(因为它要和多台web服务器连接),并且读取数据的时候还会发生io阻塞。而disruptor的消费者是单线程的,那么阻塞的生产者线程不会影响商业逻辑的处理效率吗?

2012年02月01日 15:36 "@chenchuanfeng001"的内容
那么该生产者就应该是多线程的(因为它要和多台web服务器连接),并且读取数据的时候还会发生io阻塞。而disruptor的消费者是单线程的 ...

这中间通过一种队列方式,Disruptor是通过Ringbuffer,几乎同时到达的事件进入Ringbuffer,被单线程一个个迅速取走。

我的web服务器需要访问数据库,如果是原来的ssh架构,那么访问数据库的时候servlet线程就会阻塞等待。
如果我现在使用异步servlet,把请求扔到disruptor里,然后在disruptor的消费者线程里再进行数据库访问操作,这样会提升服务器的性能吗?

2012年02月01日 15:50 "@chenchuanfeng001"的内容
访问数据库的时候servlet线程就会阻塞等待 ...

Servlet线程堵塞是果,不是因,因是由于你的数据库处理繁忙,无法及时处理Servlet线程,导致Servlet线程堵塞,这种情况使用Disruptor没有明显效果。

关机时提高数据库处理能力,其中一种性价比高,可扩展可伸缩的方案就是用内存软件计算替代或分担数据库自身处理能力,或者将数据库查询计算能力与其存储能力分离,一个SQL语句又排序又存储,很显然耦合性太强,会拖累数据库处理能力,将排序或其他计算改写到内存中进行,通过Disruptor输出,触发数据库再进行存储。

推荐这篇文章:非推倒重来式的读/写伸缩扩展

2012年02月01日 15:39 "@banq"的内容
这中间通过一种队列方式,Disruptor是通过Ringbuffer,几乎同时到达的事件进入Ringbuffer,被单线程一个个迅速取走。 ...

对,这在一篇blog中提到,lmax是用消息队列来收集web端请求的,这个消息队列也是唯一的生产者。