disruptor多生产者使用问题。

我使用disruptor做性能测试。
程序结构是这样的,我使用异步servlet接收请求,请求接收后就调用ForwardService.publish()方法将请求事件放入disruptor里,disruptor的消费者只是简单的结束servlet请求。
我现在用jemeter起200个线程,不停地发送请求到服务器上,刚开始是正常的。但是运行了一段时间后,tps迅速将下来,cpu达到100%。

代码如下。我在代码里加了一些输出语句,发现
System.out.println(ringBuffer.hasAvailableCapacity(500)); 这句一直输出true,也就是说ringbuffer并没有满。而刚开始System.out.println("public time:"+ (System.currentTimeMillis() - start));这句输出是0,但是一段时间后public time就暴增,增加到几秒。是我的代码有问题,还是disruptor不能支持太多的生产者并发使用?麻烦banq指点一下。

个人猜想可能的原因是生产者执行long sequence = ringBuffer.next();获得sequence之后,线程被切换,由于它还没有publish,所以后面其他生产者线程都在busy wait等待,结果导致性能骤降。

public class ForwardService {
private static Logger logger = Logger.getLogger(ForwardService.class);
private static final int ENTRIES = 1024*8;

private static final ExecutorService executorService;
private static final Disruptor<ForwardEvent> disruptor;
private static final RingBuffer<ForwardEvent> ringBuffer;

static {
executorService = new ThreadPoolExecutor(10, 100, 60L,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
// executorService = Executors.newFixedThreadPool(100);
WaitStrategy waitStrategy = new SleepingWaitStrategy();
ClaimStrategy claimStrategy = new MultiThreadedClaimStrategy(ENTRIES);
disruptor = new Disruptor<ForwardEvent>(ForwardEvent.FACTORY,executorService,claimStrategy,waitStrategy);
ForwardHandler forwardHandler = new ForwardHandler();
// OutputHandler outputHandler = new OutputHandler();
disruptor.handleEventsWith(forwardHandler);
disruptor.handleExceptionsFor(forwardHandler).with(
new IgnoreExceptionHandler());
// disruptor.after(forwardHandler).handleEventsWith(outputHandler);
ringBuffer = disruptor.getRingBuffer();
disruptor.start();
}

public static void publish(HttpServletRequest req,
HttpServletResponse resp, Continuation continuation) {
// long cursor = ringBuffer.getCursor();
System.out.println(ringBuffer.hasAvailableCapacity(500));
long sequence = ringBuffer.next();
long start = System.currentTimeMillis();
ForwardEvent event= ringBuffer.get(sequence);
event.setReq(req);
event.setResp(resp);
event.setContinuation(continuation);
event.setTimeBeforePublish(start);
ringBuffer.publish(sequence);
System.out.println("public time:"
+ (System.currentTimeMillis() - start));
}
[该贴被chenchuanfeng001于2012-02-06 16:01修改过]
[该贴被chenchuanfeng001于2012-02-06 16:03修改过]

disruptor有支持一个线程的生产者方式和支持多个线程生产者的方式,有一个参数好像要设置一下,你查看一下手册。

2012年02月06日 17:08 "@banq"的内容
disruptor有支持一个线程的生产者方式和支持多个线程生产者的方式,有一个参数好像要设置一下,你查看一下手册。 ...

好像没有可以设置的。

按我的理解disruptor在太多生产者的情况下会出问题。
因为disruptor的生产者获取sequence之后,必须按顺序将事件publish到disruptor。现在有200个生产者线程,如果第一个squence=1的在publish之前切换了,那么后面的线程都publish不了,都在等待。结果是第一个线程必须等到200次线程切换后才能做publish,第二个线程最高要等400次(因为第一个线程publish之后不一定会轮到他),第3个线程最高要600次....。这样导致整个服务器接近瘫痪。

我看了disruptor的测试代码,里面最多只有3个生产者。也就是说它们最高线程切换等待的时间只有9次,所以基本没影响。不知道我这样理解对不对?

追踪disruptor源码进去,publish方法最后就是一个繁忙等待,应该没有什么参数可以设置。
@Override
public void serialisePublishing(final long sequence, final Sequence cursor, final int batchSize)
{
final long expectedSequence = sequence - batchSize;
while (expectedSequence != cursor.get())
{
// busy spin
}

cursor.set(sequence);
}


在其2.5中增加多线程claimbackoff的策略,防止因为事件没来得及被处理导致buffer满了,我估计是不是这。
另外它的使用方式有些怪,甚至是唯一的,特别是它的disruptor类。我测试过其以前的disruptorwizard类,它里面有一个锁等待,我去除后才测试正常,否则闲着也是占据50%
你说得的publisher切换是有这回事,实际上就是手动增数1,否则无法发出去。
[该贴被banq于2012-02-06 19:00修改过]

问题解决了。
刚好昨天disruptor发布了2.8版本解决了这个问题。
现在有了新的MultithreadClaimStrategy类,原来的改名叫做MultithreadLowContentionClaimStrategy。新类里面serialisePublishing方法不只是简单的阻塞等待,解决了上面的问题。
来源:http://groups.google.com/group/lmax-disruptor/browse_thread/thread/1eed316ecc19c621

2012年02月07日 09:03 "@chenchuanfeng001"的内容
问题解决了。
刚好昨天disruptor发布了2.8版本解决了这个问题。 ...

好样的,弄不好是你的发现促使他们fixed bug呢。

赞一个。不过根据diruptor的工作原理,使用过多的producer会降低性能的,最好只使用一个.

2012年02月07日 22:21 "@jeffreyqu"的内容
不过根据diruptor的工作原理,使用过多的producer会降低性能的,最好只使用一个. ...

但是一般web请求都是多线程的,怎么弄成单生产者呢?

关于disruptor的使用还有些不明白。
一般情况下,系统会产生各种事件,我是把这些事件发布到一个disruptor呢,还是发布的不同的disruptor?
如果是同一个disruptor,由于消费者是从Event中获取数据,而不同事件需要不同数据,那么Event类怎么处理?难道使它包含很多属性来满足各种消费者的需求?
如果用不同disruptor,那么每增加一个功能就要增加一个disruptor?
[该贴被chenchuanfeng001于2012-02-08 15:39修改过]

2012年02月08日 15:30 "@chenchuanfeng001"的内容
一般情况下,系统会产生各种事件,我是把这些事件发布到一个disruptor呢,还是发布的不同的disruptor?
如果是同一个disruptor,由于消费者是从Event中获取数据,而不同事件需要不同数据,那么Event类怎么处理?难道 ...

根据官方文档给出的性能数据,单台服务器一个disruptor就足够处理所有事件了。可以只设一个producer,设3个consumer。最终的性能取决于用于持久化操作的consummer的性能。
Disruptor的内部实际上是个数组,放一个Event接口,不同的Event都实现这个接口。

2012年02月08日 14:39 "@chenchuanfeng001"的内容
但是一般web请求都是多线程的,怎么弄成单生产者呢? ...

我们现在用RabbitMQ实现了一种RPC式的调用,http://www.rabbitmq.com/tutorials/tutorial-six-python.html

遇到同样的问题
使用了WorkerPool, 将AsyncContext发布到了ringbuffer,发现QPS比原有服务低了600,无论如何都打压不上去了。

2012-08-27 10:13 "@cintana"的内容
发布到了ringbuffer ...

RingBuffer的等待策略很重要,大吞吐量的则在空闲时刻占据CPU很高,因为Disruptor要保证CPU线程的控制权,这样在需要时才不会被挤压。