在Java Stream實現大型查詢
Spring JdbcTemplate從1.0版開始就一直在使用這個類,並且它的發展很好,但我希望在版本5中它會包含一些流式處理功能,用於查詢很大資料結果,可惜沒有發生。
儘管如此,有時我需要執行返回數百萬行的查詢,而且我不能使用JdbcTemplate方法來返回列表,RowCallbackHandler非常適合,但是如果收到Stream會好得多,不是嗎?特別是如果你有自定義RowMappers ...
所以,我決定編寫自己的Stream生成器以與JdbcTemplate一起使用。在這個過程中,我最終建立了一個更通用的Stream生成器,我覺得這很好,所以我想與需要類似東西的人分享。
挑戰
首先,我們需要考慮流是惰性的,當你得到一個流並定義要在其上完成的操作時,其實沒有任何事情發生,直到你實現最終操作,它需要實際遍歷元素並應用對它的操作。有些操作遍歷整個流(例如計數或將元素收集到另一個集合中),並且存在短路操作(例如確定是否有任何元素通過某些過濾器)。
因此,我們希望得到一個流,並在其上定義操作,並且沒有任何反應,直到流需要遍歷,然後需要執行查詢(這意味著與資料庫建立開放連線)。如果發生錯誤,查詢需要停止(並且JdbcTemplate將負責清理連線和其他資源)。
我發現我能夠完成這項工作的唯一方法是使用兩個執行緒:一個生成器執行緒,其中執行查詢,行以某種方式提供給流,以及一個消費者執行緒,它是流的讀者。
我們需要一個緩衝區,生產者將在其中儲存元素,消費者將從中獲取元素。LinkedBlockingQueue似乎是完美的。
<b>public</b> <b>static</b> <T> Stream<T> streamForQuery(<b>int</b> bufferSize, T endOfStreamMarker, Consumer<Consumer<T>> query) { <b>final</b> LinkedBlockingQueue<T> queue = <b>new</b> LinkedBlockingQueue<>(bufferSize); <font><i>//This is the consumer that is usually passed to queries;</i></font><font> </font><font><i>//it will receive each item from the query and put it in the queue</i></font><font> Consumer<T> filler = t -> { <b>try</b> { </font><font><i>//Try to add to the queue, waiting up to 1 second</i></font><font> </font><font><i>//Honestly if after 1 second the queue is still full, either the stream consumer</i></font><font> </font><font><i>//needs some serious optimization or, more likely, a short-circuit terminal</i></font><font> </font><font><i>//operation was performed on the stream.</i></font><font> <b>if</b> (!queue.offer(t, 1, TimeUnit.SECONDS)) { </font><font><i>//If the queue is full after 1 second, time out.</i></font><font> </font><font><i>//Throw an exception to stop the producer queue.</i></font><font> log.error(</font><font>"Timeoud waiting to feed elements to stream"</font><font>); <b>throw</b> <b>new</b> BufferOverflowException(); } } <b>catch</b> (InterruptedException ex) { System.err.println(</font><font>"Interrupted trying to add item to stream"</font><font>); ex.printStackTrace(); } }; </font><font><i>//For the stream that we return, we use a Spliterator.</i></font><font> <b>return</b> StreamSupport.stream(() -> <b>new</b> Spliterators.AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED) { </font><font><i>//We need to know if the producer thread has been started</i></font><font> <b>private</b> <b>boolean</b> started = false; </font><font><i>//If there's an exception in the producer, keep it here</i></font><font> <b>private</b> <b>volatile</b> Throwable boom; </font><font><i>/** This method is called once, before advancing to the first element. * It will start the producer thread, which runs the query, passing it our * queue filler. */</i></font><font> <b>private</b> <b>void</b> startProducer() { </font><font><i>//Get the consumer thread</i></font><font> Thread interruptMe = Thread.currentThread(); </font><font><i>//First time this is called it will run the query in a separate thread</i></font><font> </font><font><i>//This is the producer thread</i></font><font> <b>new</b> Thread(() -> { <b>try</b> { </font><font><i>//Run the query, with our special consumer</i></font><font> query.accept(filler); } <b>catch</b> (BufferOverflowException ignore) { </font><font><i>//The filler threw this, means the queue is not being consumed fast enough</i></font><font> </font><font><i>//(or, more likely, not at all)</i></font><font> } <b>catch</b> (Throwable thr) { </font><font><i>//Something bad happened, store the exception and interrupt the reader</i></font><font> boom = thr; interruptMe.interrupt(); } }).start(); started = <b>true</b>; } @Override <b>public</b> <b>boolean</b> tryAdvance(Consumer<? <b>super</b> T> action) { <b>if</b> (!started) { startProducer(); } <b>try</b> { </font><font><i>//Take an item from the queue and if it's not the end of stream maker, pass it</i></font><font> </font><font><i>//to the action consumer.</i></font><font> T t = queue.take(); <b>if</b> (t != endOfStreamMarker) { action.accept(t); <b>return</b> <b>true</b>; } } <b>catch</b> (InterruptedException ex) { <b>if</b> (boom == <b>null</b>) { System.err.println(</font><font>"Interrupted reading from stream"</font><font>); ex.printStackTrace(); } <b>else</b> { </font><font><i>//Throw the exception from the producer on the consumer side</i></font><font> <b>throw</b> <b>new</b> RuntimeException(boom); } } <b>return</b> false; } }, Spliterator.IMMUTABLE, false); } </font>
這就是你使用JdbcTemplate的方式:
<b>final</b> MyRow marker = <b>new</b> MyRow(); Stream<MyRow> stream = streamForQuery(100, marker, callback -> { <font><i>//Pass a RowCallbackHandler that passes a MyRow to the callback</i></font><font> jdbcTemplate.query(</font><font>"SELECT * FROM really_big_table_with_millions_of_rows"</font><font>, rs -> { callback.accept(myRowMapper.mapRow(rs, 0)); } ); </font><font><i>//Pass the marker to the callback, to signal end of stream</i></font><font> callback.accept(marker); }); </font>
此時,尚未執行查詢。你可以做以下事情:
stream = stream.filter(row -> row.isPretty());
但仍然沒有任何反應。當你做這樣的事情時:
Optional<MyRow> row = stream.skip(100_000).limit(1000).findAny();
然後執行查詢,將讀取(並跳過)前十萬行,並且每行將通過過濾器,直到漂亮地讀取了一千行。
請,請,請不要使用它作為良好的WHERE子句和正確索引表的替代品。我主要使用這個東西生成報告,通過將元素對映到一個公共型別進行進一步處理來連線不相交型別的流(基本上,彌補了Java中缺少的聯合型別)。
話雖如此,能夠以流方式從資料庫中讀取行是非常酷的。