在Java Stream实现大量数据查询


Spring JdbcTemplate从1.0版开始就一直在使用这个类,并且它的发展很好,但我希望在版本5中它会包含一些流式处理功能,用于查询很大数据结果,可惜没有发生。
尽管如此,有时我需要执行返回数百万行的查询,而且我不能使用JdbcTemplate方法来返回列表,RowCallbackHandler非常适合,但是如果收到Stream会好得多,不是吗?特别是如果你有自定义RowMappers ...
所以,我决定编写自己的Stream生成器以与JdbcTemplate一起使用。在这个过程中,我最终创建了一个更通用的Stream生成器,我觉得这很好,所以我想与需要类似东西的人分享。

挑战
首先,我们需要考虑流是惰性的,当你得到一个流并定义要在其上完成的操作时,其实没有任何事情发生,直到你实现最终操作,它需要实际遍历元素并应用对它的操作。有些操作遍历整个流(例如计数或将元素收集到另一个集合中),并且存在短路操作(例如确定是否有任何元素通过某些过滤器)。
因此,我们希望得到一个流,并在其上定义操作,并且没有任何反应,直到流需要遍历,然后需要运行查询(这意味着与数据库建立开放连接)。如果发生错误,查询需要停止(并且JdbcTemplate将负责清理连接和其他资源)。
我发现我能够完成这项工作的唯一方法是使用两个线程:一个生成器线程,其中运行查询,行以某种方式提供给流,以及一个消费者线程,它是流的读者。
我们需要一个缓冲区,生产者将在其中存储元素,消费者将从中获取元素。LinkedBlockingQueue似乎是完美的。
 

public static <T> Stream<T> streamForQuery(int bufferSize, T endOfStreamMarker,
                                               Consumer<Consumer<T>> query) {
        final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(bufferSize);
        //This is the consumer that is usually passed to queries;
       
//it will receive each item from the query and put it in the queue
        Consumer<T> filler = t -> {
            try {
               
//Try to add to the queue, waiting up to 1 second
               
//Honestly if after 1 second the queue is still full, either the stream consumer
               
//needs some serious optimization or, more likely, a short-circuit terminal
               
//operation was performed on the stream.
                if (!queue.offer(t, 1, TimeUnit.SECONDS)) {
                   
//If the queue is full after 1 second, time out.
                   
//Throw an exception to stop the producer queue.
                    log.error(
"Timeoud waiting to feed elements to stream");
                    throw new BufferOverflowException();
                }
            } catch (InterruptedException ex) {
                System.err.println(
"Interrupted trying to add item to stream");
                ex.printStackTrace();
            }
        };
       
//For the stream that we return, we use a Spliterator.
        return StreamSupport.stream(() -> new Spliterators.AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED) {
           
//We need to know if the producer thread has been started
            private boolean started = false;
           
//If there's an exception in the producer, keep it here
            private volatile Throwable boom;
           
/** This method is called once, before advancing to the first element.
             * It will start the producer thread, which runs the query, passing it our
             * queue filler.
             */

            private void startProducer() {
               
//Get the consumer thread
                Thread interruptMe = Thread.currentThread();
               
//First time this is called it will run the query in a separate thread
               
//This is the producer thread
                new Thread(() -> {
                    try {
                       
//Run the query, with our special consumer
                        query.accept(filler);
                    } catch (BufferOverflowException ignore) {
                       
//The filler threw this, means the queue is not being consumed fast enough
                       
//(or, more likely, not at all)
                    } catch (Throwable thr) {
                       
//Something bad happened, store the exception and interrupt the reader
                        boom = thr;
                        interruptMe.interrupt();
                    }
                }).start();
                started = true;
            }
            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                if (!started) {
                    startProducer();
                }
                try {
                   
//Take an item from the queue and if it's not the end of stream maker, pass it
                   
//to the action consumer.
                    T t = queue.take();
                    if (t != endOfStreamMarker) {
                        action.accept(t);
                        return true;
                    }
                } catch (InterruptedException ex) {
                    if (boom == null) {
                        System.err.println(
"Interrupted reading from stream");
                        ex.printStackTrace();
                    } else {
                       
//Throw the exception from the producer on the consumer side
                        throw new RuntimeException(boom);
                    }
                }
                return false;
            }
        }, Spliterator.IMMUTABLE, false);
    }


这就是你使用JdbcTemplate的方式:

final MyRow marker = new MyRow();
Stream<MyRow> stream = streamForQuery(100, marker, callback -> {
    //Pass a RowCallbackHandler that passes a MyRow to the callback
    jdbcTemplate.query(
"SELECT * FROM really_big_table_with_millions_of_rows",
                       rs -> { callback.accept(myRowMapper.mapRow(rs, 0)); }
    );
   
//Pass the marker to the callback, to signal end of stream
    callback.accept(marker);
});

此时,尚未执行查询。你可以做以下事情:

stream = stream.filter(row -> row.isPretty());

但仍然没有任何反应。当你做这样的事情时:
Optional<MyRow> row = stream.skip(100_000).limit(1000).findAny();

然后执行查询,将读取(并跳过)前十万行,并且每行将通过过滤器,直到漂亮地读取了一千行。

请,请,请不要使用它作为良好的WHERE子句和正确索引表的替代品。我主要使用这个东西生成报告,通过将元素映射到一个公共类型进行进一步处理来连接不相交类型的流(基本上,弥补了Java中缺少的联合类型)。
话虽如此,能够以流方式从数据库中读取行是非常酷的。