使用Disruptor 遇到问题,eventHander可能读不到事件

代码使用disruptor 3.2.1, 使用多生产者单消费者模式,目的是为把并发写通过disruptor转成单写

刚开始没问题, 运行一会后,publish event 后,evenHandler不会被调用了,不知是disruptor问题,还是我代码的问题,但我把disruptor 换成jdk的ArrayBlockingQueue,没出现问题。定位了很久,没找出原因,以下是代码,没有多少,就两个类,希望有disruptor 使用经验的道友帮忙分析一下

启动disruptor且发布事件的类:


public class CommandDisruptorServiceImpl implements CommandHandleService {
private static final EventTranslatorVararg<CommandEvent> TRANSLATOR = new EventTranslatorVararg<CommandEvent>() {
@Override
public void translateTo(CommandEvent event, long sequence,
Object... args) {
if (args.length >= 3) {
event.setCommandExecutor((CommandExecutor) args[0]);
event.setCommand((Command<?>) args[1]);
event.setCommandResult((CommandResult) args[2]);
}
}
};

private Logger log = Logger.getLogger(CommandDisruptorServiceImpl.class);
private volatile Disruptor<CommandEvent> disruptor;
private volatile boolean started = false;

@Override
public void start(int bufSize) {
if (null == disruptor) {
synchronized (this) {
if (null == disruptor) {
disruptor = new Disruptor<CommandEvent>(new CommandEventFactory(),
bufSize,
Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread s = Executors.defaultThreadFactory()
.newThread(r);
s.setName("command-handle-thread");
s.setDaemon(true);
return s;
}
}));

disruptor.handleEventsWith(new CommandEventHandler());
disruptor.start();
started = true;
log.info(
"disruptor started!");
}
}
}
}

@Override
public void stop() {
if (disruptor != null) {
disruptor.shutdown();
started = false;
disruptor = null;
log.info(
"disruptor stopted");
}
}

@Override
public <T> T postCommandAndWaitResult(CommandExecutor next,
Command<T> command) {
final RingBuffer<CommandEvent> ringBuffer = disruptor.getRingBuffer();
final CommandResult commandResult = new CommandResult();
ringBuffer.publishEvent(TRANSLATOR, next, command, commandResult);
log.info(
"Command pulish to ringbuffer! cursor:" +
disruptor.getCursor());
return waitCommandResult(commandResult, 2000);
}

@SuppressWarnings(
"unchecked")
private <T> T waitCommandResult(CommandResult commandResult, long timeOut) {
synchronized (commandResult) {
try {
while (!commandResult.isCommandIsFinish()) {
log.info(
"Command waiting for result!");
commandResult.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

log.debug(
"commandResult:" + commandResult.getResult());
return (T) commandResult.getResult();
}

@Override
public boolean isStarted() {
return started;
}
}


eventHandler 类:


public class CommandEventHandler implements EventHandler<CommandEvent> {
private Logger log = Logger.getLogger(CommandEventHandler.class);

@Override
public void onEvent(CommandEvent event, long sequence, boolean endOfBatch)
throws Exception {
log.info("enter commandHandler! sequence:" + sequence + " endOfBatch:" +
endOfBatch);

Command<?> command = event.getCommand();
CommandExecutor cmdInterceptor = event.getCommandExecutor();
final CommandResult commandResult = event.getCommandResult();

try {
commandResult.setResult(cmdInterceptor.execute(command));
} finally {
synchronized (commandResult) {
commandResult.setCommandIsFinish(true);
log.info(
"to notify wait command!");
commandResult.notify();
}
}
}
}

我也看了一下disruptor 中 BatchEventProcessor 的 run方法源码
onEvent 不被执行有两种可能
1. sequenceBarrier.waitFor(nextSequence); 一直在等待
2. nextSequence <= availableSequence 这个条件一直不被满足

run方法源码:


@Override
/<strong>
* It is ok to have another thread rerun this method after a halt().
*
* @throws IllegalStateException if this object instance is already running in a thread
*/
@Override
public void run() {
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("Thread is already running");
}

sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;

try {
while (true) {
try {
/</strong>
* onEvent 不被执行有两种可能
*1. sequenceBarrier.waitFor(nextSequence); 一直在等待
*2. nextSequence <= availableSequence 这个条件一直不被满足
*/
final long availableSequence = sequenceBarrier.waitFor(nextSequence);

while (nextSequence <= availableSequence) {
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence,
nextSequence == availableSequence);
nextSequence++;
}

sequence.set(availableSequence);
} catch (final TimeoutException e) {
notifyTimeout(sequence.get());
} catch (final AlertException ex) {
if (!running.get()) {
break;
}
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence,
event);
sequence.set(nextSequence);
nextSequence++;
}
}
} finally {
notifyShutdown();
running.set(false);
}
}

[该贴被abaddoncoder于2015-05-25 19:10修改过]
[该贴被abaddoncoder于2015-05-25 19:15修改过]
[该贴被abaddoncoder于2015-05-25 19:19修改过]
[该贴被abaddoncoder于2015-05-25 19:21修改过]