disruptor中多线程消费者的疑问

请教下大家,关于disruptor 中的消费者
就拿通俗包装产品做个例子, 生产者生产出产品后,将产品放入 ringBuffer, 然后消费者从ringBuffer中取得产品进行包装。

我理解的 在disruptor中,生产者当然可以是多个,即多线程来处理,多个生产者同时向ringBuffer中填充产品,这个由disruptor来处理并行写入问题。

问题是,在大多数场景中,我们不光需要 并行的生产者,更需要并行的消费者。

而在 disruptor 框架中,如果我们定义了多个 EventHandler 的实例, 这些EventHandler 都会对每个产品 消费一次,
例如:
disruptor.handleEventsWith( new HelloEventHandler("first")
,new HelloEventHandler("second");

执行的结果是:
handler2-----pool-1-thread-2------:0
handler1-----pool-1-thread-1------:0
handler2-----pool-1-thread-2------:1
handler1-----pool-1-thread-1------:1
handler2-----pool-1-thread-2------:2
handler1-----pool-1-thread-1------:2
handler2-----pool-1-thread-2------:3
handler1-----pool-1-thread-1------:3

从上述结果可以看到 , 每一个handler是启动了一个线程来处理,但是其从RingBuffer中取出的数据会重复。

例子是参考:
Disruptor实例:单生产者-多消费者 , 这篇文章的内容: http://www.jdon.com/44868

我本来的目的是希望 HelloEventHandler 的这两个实例可以分别启动线程并行执行,以并行消费产品。

但事实上, 生产者没产出一个产品, 每个消费者实例都会重复的执行一遍。


因此我想问一下,在disruptor 应该如何来实现 有多个消费者线程 同时来获取RingBuffer中的产品,而不会重复执行呢

难道必须在EventHandler中自行判断当前所传递的事件是否已经被消费过才可以的吗?
如果这样的话,消费者中判断是否已处理过产品,岂不又涉及到多线程问题了。

另外也想听取一下 大家对 Disruptor中 启动消费者数量的建议,系统中启动多少生产者,多少消费者 ,有哪些衡量点和参考。

谢谢。
[该贴被oleio于2013-05-04 17:04修改过]

2013-05-04 16:16 "@oleio
"的内容
其从RingBuffer中取出的数据会重复 ...

不会重复,可能是你测试代码有问题。

谢谢 banq 的回复,我指的重复是
disruptor.handleEventsWith( new HelloEventHandler("first")
,new HelloEventHandler("second");
这是两个HelloEventHandler类的实例 ,

运行时 HelloEventHandler("first") 这个业务处理类执行过的 产品,另一个业务处理类 HelloEventHandler("second") ,也会执行一遍。

disruptor是pub-sub模式,
你publish了一个事件,当然是所有的event handler都会响应的啊?这样有什么不对吗?pub-sub这种模式就是广播机制呀。你是不是希望虽然可以有多个event handler,但只能有一个event handler可以执行,也就是争抢着执行?

谢谢回复
是这样的,同类型的 比如 都是 HelloEventHandler 的消费者,希望它们以抢占的方式来消费 RingBuffer 中的产品。
而不同类型的消费者,比如 HelloEventHandler 与 ByeEventHandler ,那么它们就是保持原先的广播机制 来处理。

请问 disruptor 支持这种抢占模式吗?

2013-05-05 14:32 "@oleio
"的内容
希望它们以抢占的方式来消费 RingBuffer 中的产品。 ...

你希望的抢占是什么样的?一般一个HelloEventHandler 类型一个实例啊。

disruptor如果是pub-sub模式的,那就是不支持抢占模式了。LMAX架构中的, journal, replicator, blp三个event handler都会消费ringbuffer中的槽数据的。只是blp不会跑在journal, replicator前面而已。journal, replicator这两个则是完全并行的。ringbuffer中的数据不像queue,会被取出去然后没有了。ringbuffer里的数据永远不会被移除,除非是被publisher覆盖掉。所以,我觉得应该是实现不了你要的抢占竞争执行某个event的功能的。
[该贴被tangxuehua于2013-05-05 15:14修改过]

to banq:
你希望的抢占是什么样的?一般一个HelloEventHandler 类型一个实例啊。

我是希望 一个handler类型 能有多个实例,以便于 消费者可以多线程的来跑,进一步提升性能。

这个可以用SequenceBarrier来实现吧?我觉得其实你不是要抢占,而是希望不同的线程处理ringbuffer里不同的信息,互不争抢,又不遗漏。

我个人的看法是,disruptor模式的产生过程中证明了,无锁可以取得最高的性能,一个线程处理+无锁要比N个线程+有锁快。
所以我想楼主的初衷是希望用多个线程来提高速度,但是实际上这不可避免的引入锁,最终的结果是比只有一个线程处理还要慢。
而且LMAX架构并不是说只有一个disruptor,实际的系统中所有耗时的处理都应该分解并且通过异步的方式(也就是这里的disruptor)进行链接。

这几天,写看了下关于LMAX的架构,看了网上流传的测试代码,也产生了和楼主相同的疑问,比如:当生产者已经发布了多个消息在RingBuffer中,如果此时有多个消费者, 发现所有的消费者都会执行每个Slot的Event事件。例如 写日志、备份数据、和业务逻辑的处理。只是处理的先后顺序不一致。 个人感觉这和LMax所说的不同的消费者执行不同的Slot有些不一样!还是LMAX只是支持这种广播的方式。如果我想多个消费者并行执行RringBuffer ,已经被每个消费者执行的则不再执行能否用disruptor实现!

目前disruptor的消费的业务逻辑代码 都写在其onEvent的回调函数里,没有发现能改变其下标(cursor)的方法。想知道有没有朋友遇到和我一样的问题,刚刚接触Disruptor许多地方不是特别的了解。

2013-11-22 14:58 "@tecentID9347F"的内容
如果我想多个消费者并行执行RringBuffer ,已经被每个消费者执行的则不再执行能否用disruptor实现! ...

Disruptor的特点就是将多线程通过ringbuffer变成单线程,从而实现单写操作:http://www.jdon.com/performance/singlewriter.html

实际操作中测试还发现,当生产者发布了多个消息之后,消费者只是再单独的线程中运行,我想知道 是不是只有超线程技术的机子才能支持他说的哪个并发。

2013-11-22 15:19 "@banq"的内容
Disruptor的特点就是将多线程通过ringbuffer变成单线程,从而实现单写操作:http://www.jdon.com/performance/singlewriter.html ...

非常感谢您,抽时间看我的评论,看了单写的哪个操作。(Disruptor的特点是将多线程生产者通过ringbuffer变成单线程消费者,通过单线程消费者对共享资源进行写操作)但是这样的话,如果并发量比较少,disruptor 的优势没有启用多个线程去跑任务消耗的时间快!这样Disruptor的优势又在哪里。