代码使用disruptor 3.2.1, 使用多生产者单消费者模式,目的是为把并发写通过disruptor转成单写
刚开始没问题, 运行一会后,publish event 后,evenHandler不会被调用了,不知是disruptor问题,还是我代码的问题,但我把disruptor 换成jdk的ArrayBlockingQueue,没出现问题。定位了很久,没找出原因,以下是代码,没有多少,就两个类,希望有disruptor 使用经验的道友帮忙分析一下
启动disruptor且发布事件的类:
| public class CommandDisruptorServiceImpl implements CommandHandleService {
 private static final EventTranslatorVararg<CommandEvent> TRANSLATOR = new EventTranslatorVararg<CommandEvent>() {
 @Override
 public void translateTo(CommandEvent event, long sequence,
 Object... args) {
 if (args.length >= 3) {
 event.setCommandExecutor((CommandExecutor) args[0]);
 event.setCommand((Command<?>) args[1]);
 event.setCommandResult((CommandResult) args[2]);
 }
 }
 };
 
 private Logger log = Logger.getLogger(CommandDisruptorServiceImpl.class);
 private volatile Disruptor<CommandEvent> disruptor;
 private volatile boolean started = false;
 
 @Override
 public void start(int bufSize) {
 if (null == disruptor) {
 synchronized (this) {
 if (null == disruptor) {
 disruptor = new Disruptor<CommandEvent>(new CommandEventFactory(),
 bufSize,
 Executors.newCachedThreadPool(new ThreadFactory() {
 @Override
 public Thread newThread(Runnable r) {
 Thread s = Executors.defaultThreadFactory()
 .newThread(r);
 s.setName("command-handle-thread");
 s.setDaemon(true);
 return s;
 }
 }));
 
 disruptor.handleEventsWith(new CommandEventHandler());
 disruptor.start();
 started = true;
 log.info("disruptor started!");
 }
 }
 }
 }
 
 @Override
 public void stop() {
 if (disruptor != null) {
 disruptor.shutdown();
 started = false;
 disruptor = null;
 log.info("disruptor stopted");
 }
 }
 
 @Override
 public <T> T postCommandAndWaitResult(CommandExecutor next,
 Command<T> command) {
 final RingBuffer<CommandEvent> ringBuffer = disruptor.getRingBuffer();
 final CommandResult commandResult = new CommandResult();
 ringBuffer.publishEvent(TRANSLATOR, next, command, commandResult);
 log.info("Command pulish to ringbuffer! cursor:" +
 disruptor.getCursor());
 return waitCommandResult(commandResult, 2000);
 }
 
 @SuppressWarnings("unchecked")
 private <T> T waitCommandResult(CommandResult commandResult, long timeOut) {
 synchronized (commandResult) {
 try {
 while (!commandResult.isCommandIsFinish()) {
 log.info("Command waiting for result!");
 commandResult.wait();
 }
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 
 log.debug("commandResult:" + commandResult.getResult());
 return (T) commandResult.getResult();
 }
 
 @Override
 public boolean isStarted() {
 return started;
 }
 }
 
 
 
 | 
eventHandler 类:
| public class CommandEventHandler implements EventHandler<CommandEvent> {
 private Logger log = Logger.getLogger(CommandEventHandler.class);
 
 @Override
 public void onEvent(CommandEvent event, long sequence, boolean endOfBatch)
 throws Exception {
 log.info("enter commandHandler! sequence:" + sequence + " endOfBatch:" +
 endOfBatch);
 
 Command<?> command = event.getCommand();
 CommandExecutor cmdInterceptor = event.getCommandExecutor();
 final CommandResult commandResult = event.getCommandResult();
 
 try {
 commandResult.setResult(cmdInterceptor.execute(command));
 } finally {
 synchronized (commandResult) {
 commandResult.setCommandIsFinish(true);
 log.info("to notify wait command!");
 commandResult.notify();
 }
 }
 }
 }
 
 
 | 
我也看了一下disruptor 中 BatchEventProcessor 的 run方法源码
onEvent 不被执行有两种可能
1. sequenceBarrier.waitFor(nextSequence); 一直在等待
2. nextSequence <= availableSequence 这个条件一直不被满足
 run方法源码:
| @Override
 /<strong>
 * It is ok to have another thread rerun this method after a halt().
 *
 * @throws IllegalStateException if this object instance is already running in a thread
 */
 @Override
 public void run() {
 if (!running.compareAndSet(false, true)) {
 throw new IllegalStateException("Thread is already running");
 }
 
 sequenceBarrier.clearAlert();
 notifyStart();
 T event = null;
 long nextSequence = sequence.get() + 1L;
 
 try {
 while (true) {
 try {
 /</strong>
 * onEvent 不被执行有两种可能
 *1. sequenceBarrier.waitFor(nextSequence); 一直在等待
 *2. nextSequence <= availableSequence 这个条件一直不被满足
 */
 final long availableSequence = sequenceBarrier.waitFor(nextSequence);
 
 while (nextSequence <= availableSequence) {
 event = dataProvider.get(nextSequence);
 eventHandler.onEvent(event, nextSequence,
 nextSequence == availableSequence);
 nextSequence++;
 }
 
 sequence.set(availableSequence);
 } catch (final TimeoutException e) {
 notifyTimeout(sequence.get());
 } catch (final AlertException ex) {
 if (!running.get()) {
 break;
 }
 } catch (final Throwable ex) {
 exceptionHandler.handleEventException(ex, nextSequence,
 event);
 sequence.set(nextSequence);
 nextSequence++;
 }
 }
 } finally {
 notifyShutdown();
 running.set(false);
 }
 }
 
 | 
[该贴被abaddoncoder于2015-05-25 19:10修改过]
[该贴被abaddoncoder于2015-05-25 19:15修改过]
[该贴被abaddoncoder于2015-05-25 19:19修改过]
[该贴被abaddoncoder于2015-05-25 19:21修改过]