public static <T> Stream<T> streamForQuery(int bufferSize, T endOfStreamMarker, Consumer<Consumer<T>> query) { final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(bufferSize); //This is the consumer that is usually passed to queries; //it will receive each item from the query and put it in the queue Consumer<T> filler = t -> { try { //Try to add to the queue, waiting up to 1 second //Honestly if after 1 second the queue is still full, either the stream consumer //needs some serious optimization or, more likely, a short-circuit terminal //operation was performed on the stream. if (!queue.offer(t, 1, TimeUnit.SECONDS)) { //If the queue is full after 1 second, time out. //Throw an exception to stop the producer queue. log.error("Timeoud waiting to feed elements to stream"); throw new BufferOverflowException(); } } catch (InterruptedException ex) { System.err.println("Interrupted trying to add item to stream"); ex.printStackTrace(); } }; //For the stream that we return, we use a Spliterator. return StreamSupport.stream(() -> new Spliterators.AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED) { //We need to know if the producer thread has been started private boolean started = false; //If there's an exception in the producer, keep it here private volatile Throwable boom; /** This method is called once, before advancing to the first element. * It will start the producer thread, which runs the query, passing it our * queue filler. */ private void startProducer() { //Get the consumer thread Thread interruptMe = Thread.currentThread(); //First time this is called it will run the query in a separate thread //This is the producer thread new Thread(() -> { try { //Run the query, with our special consumer query.accept(filler); } catch (BufferOverflowException ignore) { //The filler threw this, means the queue is not being consumed fast enough //(or, more likely, not at all) } catch (Throwable thr) { //Something bad happened, store the exception and interrupt the reader boom = thr; interruptMe.interrupt(); } }).start(); started = true; } @Override public boolean tryAdvance(Consumer<? super T> action) { if (!started) { startProducer(); } try { //Take an item from the queue and if it's not the end of stream maker, pass it //to the action consumer. T t = queue.take(); if (t != endOfStreamMarker) { action.accept(t); return true; } } catch (InterruptedException ex) { if (boom == null) { System.err.println("Interrupted reading from stream"); ex.printStackTrace(); } else { //Throw the exception from the producer on the consumer side throw new RuntimeException(boom); } } return false; } }, Spliterator.IMMUTABLE, false); }
|