Disruptor实例:单生产者-多消费者

单生产者-单消费者就不列举了,如下为单生产者-多消费者模型:


public class OnePublisherToTwoProcessor {
private static final int BUFFER_SIZE = 1024;
private final RingBuffer<StockEvent> ringBuffer = new RingBuffer<StockEvent>(
StockEvent.EVENT_FACTORY, new SingleThreadedClaimStrategy(
BUFFER_SIZE), new YieldingWaitStrategy());
private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
private final StockEventHandler[] handlers = new StockEventHandler[2];
{
handlers[0] = new StockEventHandler("handler1");
handlers[1] = new StockEventHandler(
"handler2");
}
private final ExecutorService EXECUTOR = Executors.newFixedThreadPool(2);
private final BatchEventProcessor<?>[] batchEventProcessors = new BatchEventProcessor[2];
{
batchEventProcessors[0] = new BatchEventProcessor<StockEvent>(ringBuffer, sequenceBarrier, handlers[0]);
batchEventProcessors[1] = new BatchEventProcessor<StockEvent>(ringBuffer, sequenceBarrier, handlers[1]);
ringBuffer.setGatingSequences(batchEventProcessors[0].getSequence(),batchEventProcessors[1].getSequence());
}

public OnePublisherToTwoProcessor() {
startCon();
startPro();
}

public void startCon() {
EXECUTOR.submit(batchEventProcessors[0]);
EXECUTOR.submit(batchEventProcessors[1]);
}

void startPro() {
new Thread(new Runnable() {
@[author]Override[/author]
public void run() {
int number = 1;
long price = 10;
while (true) {
StockInfo stockInfo = new StockInfo();
stockInfo.setNumber(number++);
stockInfo.setPrice(++price);
// 获取下一个可用的slot
long pos = ringBuffer.next();
// 获得该slot
ringBuffer.get(pos).setStockInfo(stockInfo);
// 提交修改
ringBuffer.publish(pos);
try {
double interval = Math.random();
Thread.sleep((long) (1000 * interval));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}

public static void main(String[] args) {
new OnePublisherToTwoProcessor();
}
}

这段代码同一个Event会被处理两遍啊!

我是初学者,请问StockEvent要自己创建吗,