使用Disruptor 遇到问题,eventHander可能读不到事件
15-05-25
abaddoncoder
代码使用disruptor 3.2.1, 使用多生产者单消费者模式,目的是为把并发写通过disruptor转成单写
刚开始没问题, 运行一会后,publish event 后,evenHandler不会被调用了,不知是disruptor问题,还是我代码的问题,但我把disruptor 换成jdk的ArrayBlockingQueue,没出现问题。定位了很久,没找出原因,以下是代码,没有多少,就两个类,希望有disruptor 使用经验的道友帮忙分析一下
启动disruptor且发布事件的类:
eventHandler 类:
我也看了一下disruptor 中 BatchEventProcessor 的 run方法源码
onEvent 不被执行有两种可能
1. sequenceBarrier.waitFor(nextSequence); 一直在等待
2. nextSequence <= availableSequence 这个条件一直不被满足
run方法源码:
刚开始没问题, 运行一会后,publish event 后,evenHandler不会被调用了,不知是disruptor问题,还是我代码的问题,但我把disruptor 换成jdk的ArrayBlockingQueue,没出现问题。定位了很久,没找出原因,以下是代码,没有多少,就两个类,希望有disruptor 使用经验的道友帮忙分析一下
启动disruptor且发布事件的类:
public class CommandDisruptorServiceImpl implements CommandHandleService { private static final EventTranslatorVararg<CommandEvent> TRANSLATOR = new EventTranslatorVararg<CommandEvent>() { @Override public void translateTo(CommandEvent event, long sequence, Object... args) { if (args.length >= 3) { event.setCommandExecutor((CommandExecutor) args[0]); event.setCommand((Command<?>) args[1]); event.setCommandResult((CommandResult) args[2]); } } }; private Logger log = Logger.getLogger(CommandDisruptorServiceImpl.class); private volatile Disruptor<CommandEvent> disruptor; private volatile boolean started = false; @Override public void start(int bufSize) { if (null == disruptor) { synchronized (this) { if (null == disruptor) { disruptor = new Disruptor<CommandEvent>(new CommandEventFactory(), bufSize, Executors.newCachedThreadPool(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread s = Executors.defaultThreadFactory() .newThread(r); s.setName("command-handle-thread"); s.setDaemon(true); return s; } })); disruptor.handleEventsWith(new CommandEventHandler()); disruptor.start(); started = true; log.info("disruptor started!"); } } } } @Override public void stop() { if (disruptor != null) { disruptor.shutdown(); started = false; disruptor = null; log.info("disruptor stopted"); } } @Override public <T> T postCommandAndWaitResult(CommandExecutor next, Command<T> command) { final RingBuffer<CommandEvent> ringBuffer = disruptor.getRingBuffer(); final CommandResult commandResult = new CommandResult(); ringBuffer.publishEvent(TRANSLATOR, next, command, commandResult); log.info("Command pulish to ringbuffer! cursor:" + disruptor.getCursor()); return waitCommandResult(commandResult, 2000); } @SuppressWarnings("unchecked") private <T> T waitCommandResult(CommandResult commandResult, long timeOut) { synchronized (commandResult) { try { while (!commandResult.isCommandIsFinish()) { log.info("Command waiting for result!"); commandResult.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("commandResult:" + commandResult.getResult()); return (T) commandResult.getResult(); } @Override public boolean isStarted() { return started; } } <p class="indent"> |
eventHandler 类:
public class CommandEventHandler implements EventHandler<CommandEvent> { private Logger log = Logger.getLogger(CommandEventHandler.class); @Override public void onEvent(CommandEvent event, long sequence, boolean endOfBatch) throws Exception { log.info("enter commandHandler! sequence:" + sequence + " endOfBatch:" + endOfBatch); Command<?> command = event.getCommand(); CommandExecutor cmdInterceptor = event.getCommandExecutor(); final CommandResult commandResult = event.getCommandResult(); try { commandResult.setResult(cmdInterceptor.execute(command)); } finally { synchronized (commandResult) { commandResult.setCommandIsFinish(true); log.info("to notify wait command!"); commandResult.notify(); } } } } <p class="indent"> |
我也看了一下disruptor 中 BatchEventProcessor 的 run方法源码
onEvent 不被执行有两种可能
1. sequenceBarrier.waitFor(nextSequence); 一直在等待
2. nextSequence <= availableSequence 这个条件一直不被满足
run方法源码:
@Override /** * It is ok to have another thread rerun this method after a halt(). * * @throws IllegalStateException if this object instance is already running in a thread */ @Override public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); T event = null; long nextSequence = sequence.get() + 1L; try { while (true) { try { /** * onEvent 不被执行有两种可能 *1. sequenceBarrier.waitFor(nextSequence); 一直在等待 *2. nextSequence <= availableSequence 这个条件一直不被满足 */ final long availableSequence = sequenceBarrier.waitFor(nextSequence); while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } } finally { notifyShutdown(); running.set(false); } } <p class="indent"> |
[该贴被abaddoncoder于2015-05-25 19:10修改过]
[该贴被abaddoncoder于2015-05-25 19:15修改过]
[该贴被abaddoncoder于2015-05-25 19:19修改过]
[该贴被abaddoncoder于2015-05-25 19:21修改过]