public class ForwardService { private static Logger logger = Logger.getLogger(ForwardService.class); private static final int ENTRIES = 1024*8;
private static final ExecutorService executorService; private static final Disruptor<ForwardEvent> disruptor; private static final RingBuffer<ForwardEvent> ringBuffer;
static { executorService = new ThreadPoolExecutor(10, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); // executorService = Executors.newFixedThreadPool(100); WaitStrategy waitStrategy = new SleepingWaitStrategy(); ClaimStrategy claimStrategy = new MultiThreadedClaimStrategy(ENTRIES); disruptor = new Disruptor<ForwardEvent>(ForwardEvent.FACTORY,executorService,claimStrategy,waitStrategy); ForwardHandler forwardHandler = new ForwardHandler(); // OutputHandler outputHandler = new OutputHandler(); disruptor.handleEventsWith(forwardHandler); disruptor.handleExceptionsFor(forwardHandler).with( new IgnoreExceptionHandler()); // disruptor.after(forwardHandler).handleEventsWith(outputHandler); ringBuffer = disruptor.getRingBuffer(); disruptor.start(); }
public static void publish(HttpServletRequest req, HttpServletResponse resp, Continuation continuation) { // long cursor = ringBuffer.getCursor(); System.out.println(ringBuffer.hasAvailableCapacity(500)); long sequence = ringBuffer.next(); long start = System.currentTimeMillis(); ForwardEvent event= ringBuffer.get(sequence); event.setReq(req); event.setResp(resp); event.setContinuation(continuation); event.setTimeBeforePublish(start); ringBuffer.publish(sequence); System.out.println("public time:" + (System.currentTimeMillis() - start)); }
追踪disruptor源码进去,publish方法最后就是一个繁忙等待,应该没有什么参数可以设置。 @Override public void serialisePublishing(final long sequence, final Sequence cursor, final int batchSize) { final long expectedSequence = sequence - batchSize; while (expectedSequence != cursor.get()) { // busy spin }