单生产者-单消费者就不列举了,如下为单生产者-多消费者模型:
public class OnePublisherToTwoProcessor { private static final int BUFFER_SIZE = 1024; private final RingBuffer<StockEvent> ringBuffer = new RingBuffer<StockEvent>( StockEvent.EVENT_FACTORY, new SingleThreadedClaimStrategy( BUFFER_SIZE), new YieldingWaitStrategy()); private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); private final StockEventHandler[] handlers = new StockEventHandler[2]; { handlers[0] = new StockEventHandler("handler1"); handlers[1] = new StockEventHandler("handler2"); } private final ExecutorService EXECUTOR = Executors.newFixedThreadPool(2); private final BatchEventProcessor<?>[] batchEventProcessors = new BatchEventProcessor[2]; { batchEventProcessors[0] = new BatchEventProcessor<StockEvent>(ringBuffer, sequenceBarrier, handlers[0]); batchEventProcessors[1] = new BatchEventProcessor<StockEvent>(ringBuffer, sequenceBarrier, handlers[1]); ringBuffer.setGatingSequences(batchEventProcessors[0].getSequence(),batchEventProcessors[1].getSequence()); }
public OnePublisherToTwoProcessor() { startCon(); startPro(); }
public void startCon() { EXECUTOR.submit(batchEventProcessors[0]); EXECUTOR.submit(batchEventProcessors[1]); }
void startPro() { new Thread(new Runnable() { @[author]Override[/author] public void run() { int number = 1; long price = 10; while (true) { StockInfo stockInfo = new StockInfo(); stockInfo.setNumber(number++); stockInfo.setPrice(++price); // 获取下一个可用的slot long pos = ringBuffer.next(); // 获得该slot ringBuffer.get(pos).setStockInfo(stockInfo); // 提交修改 ringBuffer.publish(pos); try { double interval = Math.random(); Thread.sleep((long) (1000 * interval)); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); }
public static void main(String[] args) { new OnePublisherToTwoProcessor(); } }
|