disruptor多生产者使用问题。
我使用disruptor做性能测试。
程序结构是这样的,我使用异步servlet接收请求,请求接收后就调用ForwardService.publish()方法将请求事件放入disruptor里,disruptor的消费者只是简单的结束servlet请求。
我现在用jemeter起200个线程,不停地发送请求到服务器上,刚开始是正常的。但是运行了一段时间后,tps迅速将下来,cpu达到100%。
代码如下。我在代码里加了一些输出语句,发现
System.out.println(ringBuffer.hasAvailableCapacity(500)); 这句一直输出true,也就是说ringbuffer并没有满。而刚开始System.out.println("public time:"+ (System.currentTimeMillis() - start));这句输出是0,但是一段时间后public time就暴增,增加到几秒。是我的代码有问题,还是disruptor不能支持太多的生产者并发使用?麻烦banq指点一下。
个人猜想可能的原因是生产者执行long sequence = ringBuffer.next();获得sequence之后,线程被切换,由于它还没有publish,所以后面其他生产者线程都在busy wait等待,结果导致性能骤降。
public class ForwardService {
private static Logger logger = Logger.getLogger(ForwardService.class);
private static final int ENTRIES = 1024*8;
private static final ExecutorService executorService;
private static final Disruptor<ForwardEvent> disruptor;
private static final RingBuffer<ForwardEvent> ringBuffer;
static {
executorService = new ThreadPoolExecutor(10, 100, 60L,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
// executorService = Executors.newFixedThreadPool(100);
WaitStrategy waitStrategy = new SleepingWaitStrategy();
ClaimStrategy claimStrategy = new MultiThreadedClaimStrategy(ENTRIES);
disruptor = new Disruptor<ForwardEvent>(ForwardEvent.FACTORY,executorService,claimStrategy,waitStrategy);
ForwardHandler forwardHandler = new ForwardHandler();
// OutputHandler outputHandler = new OutputHandler();
disruptor.handleEventsWith(forwardHandler);
disruptor.handleExceptionsFor(forwardHandler).with(
new IgnoreExceptionHandler());
// disruptor.after(forwardHandler).handleEventsWith(outputHandler);
ringBuffer = disruptor.getRingBuffer();
disruptor.start();
}
public static void publish(HttpServletRequest req,
HttpServletResponse resp, Continuation continuation) {
// long cursor = ringBuffer.getCursor();
System.out.println(ringBuffer.hasAvailableCapacity(500));
long sequence = ringBuffer.next();
long start = System.currentTimeMillis();
ForwardEvent event= ringBuffer.get(sequence);
event.setReq(req);
event.setResp(resp);
event.setContinuation(continuation);
event.setTimeBeforePublish(start);
ringBuffer.publish(sequence);
System.out.println("public time:"
+ (System.currentTimeMillis() - start));
}
[该贴被chenchuanfeng001于2012-02-06 16:01修改过]
[该贴被chenchuanfeng001于2012-02-06 16:03修改过]