使用Disruptor 遇到问题,eventHander可能读不到事件

15-05-25 abaddoncoder

代码使用disruptor 3.2.1, 使用多生产者单消费者模式,目的是为把并发写通过disruptor转成单写

刚开始没问题, 运行一会后,publish event 后,evenHandler不会被调用了,不知是disruptor问题,还是我代码的问题,但我把disruptor 换成jdk的ArrayBlockingQueue,没出现问题。定位了很久,没找出原因,以下是代码,没有多少,就两个类,希望有disruptor 使用经验的道友帮忙分析一下

启动disruptor且发布事件的类:

public class CommandDisruptorServiceImpl implements CommandHandleService {
    private static final EventTranslatorVararg<CommandEvent> TRANSLATOR = new EventTranslatorVararg<CommandEvent>() {
            @Override
            public void translateTo(CommandEvent event, long sequence,
                Object... args) {
                if (args.length >= 3) {
                    event.setCommandExecutor((CommandExecutor) args[0]);
                    event.setCommand((Command<?>) args[1]);
                    event.setCommandResult((CommandResult) args[2]);
                }
            }
        };

    private Logger log = Logger.getLogger(CommandDisruptorServiceImpl.class);
    private volatile Disruptor<CommandEvent> disruptor;
    private volatile boolean started = false;

    @Override
    public void start(int bufSize) {
        if (null == disruptor) {
            synchronized (this) {
                if (null == disruptor) {
                    disruptor = new Disruptor<CommandEvent>(new CommandEventFactory(),
                            bufSize,
                            Executors.newCachedThreadPool(new ThreadFactory() {
                                @Override
                                public Thread newThread(Runnable r) {
                                    Thread s = Executors.defaultThreadFactory()
                                                        .newThread(r);
                                    s.setName("command-handle-thread");
                                    s.setDaemon(true);
                                    return s;
                                }
                            }));

                    disruptor.handleEventsWith(new CommandEventHandler());
                    disruptor.start();
                    started = true;
                    log.info("disruptor started!");
                }
            }
        }
    }

    @Override
    public void stop() {
        if (disruptor != null) {
            disruptor.shutdown();
            started = false;
            disruptor = null;
            log.info("disruptor stopted");
        }
    }

    @Override
    public <T> T postCommandAndWaitResult(CommandExecutor next,
        Command<T> command) {
        final RingBuffer<CommandEvent> ringBuffer = disruptor.getRingBuffer();
        final CommandResult commandResult = new CommandResult();
        ringBuffer.publishEvent(TRANSLATOR, next, command, commandResult);
        log.info("Command pulish to ringbuffer! cursor:" +
            disruptor.getCursor());
        return waitCommandResult(commandResult, 2000);
    }

    @SuppressWarnings("unchecked")
    private <T> T waitCommandResult(CommandResult commandResult, long timeOut) {
        synchronized (commandResult) {
            try {
                while (!commandResult.isCommandIsFinish()) {
                    log.info("Command waiting for result!");
                    commandResult.wait();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        log.debug("commandResult:" + commandResult.getResult());
        return (T) commandResult.getResult();
    }

    @Override
    public boolean isStarted() {
        return started;
    }
}


<p>

eventHandler 类:

public class CommandEventHandler implements EventHandler<CommandEvent> {
    private Logger log = Logger.getLogger(CommandEventHandler.class);

    @Override
    public void onEvent(CommandEvent event, long sequence, boolean endOfBatch)
        throws Exception {
        log.info("enter commandHandler! sequence:" + sequence + " endOfBatch:" +
            endOfBatch);

        Command<?> command = event.getCommand();
        CommandExecutor cmdInterceptor = event.getCommandExecutor();
        final CommandResult commandResult = event.getCommandResult();

        try {
            commandResult.setResult(cmdInterceptor.execute(command));
        } finally {
            synchronized (commandResult) {
                commandResult.setCommandIsFinish(true);
                log.info("to notify wait command!");
                commandResult.notify();
            }
        }
    }
}

<p>

我也看了一下disruptor 中 BatchEventProcessor 的 run方法源码

onEvent 不被执行有两种可能

1. sequenceBarrier.waitFor(nextSequence); 一直在等待

2. nextSequence <= availableSequence 这个条件一直不被满足

run方法源码:

@Override
     /**
     * It is ok to have another thread rerun this method after a halt().
     *
     * @throws IllegalStateException if this object instance is already running in a thread
     */
    @Override
    public void run() {
        if (!running.compareAndSet(false, true)) {
            throw new IllegalStateException("Thread is already running");
        }

        sequenceBarrier.clearAlert();
        notifyStart();
        T event = null;
        long nextSequence = sequence.get() + 1L;

        try {
            while (true) {
                try {
            /**
             * onEvent 不被执行有两种可能
             *1. sequenceBarrier.waitFor(nextSequence); 一直在等待
             *2. nextSequence <= availableSequence 这个条件一直不被满足
             */
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);

                    while (nextSequence <= availableSequence) {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence,
                            nextSequence == availableSequence);
                        nextSequence++;
                    }

                    sequence.set(availableSequence);
                } catch (final TimeoutException e) {
                    notifyTimeout(sequence.get());
                } catch (final AlertException ex) {
                    if (!running.get()) {
                        break;
                    }
                } catch (final Throwable ex) {
                    exceptionHandler.handleEventException(ex, nextSequence,
                        event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        } finally {
            notifyShutdown();
            running.set(false);
        }
    }
<p>

[该贴被abaddoncoder于2015-05-25 19:10修改过]

[该贴被abaddoncoder于2015-05-25 19:15修改过]

[该贴被abaddoncoder于2015-05-25 19:19修改过]

[该贴被abaddoncoder于2015-05-25 19:21修改过]