原來你是這樣的 Stream:淺析 Java Stream 實現原理
作者 | 任旭東
杏仁後端攻城獅,關注服務端技術和敏捷開發。
Stream 為什麼會出現?
Stream 出現之前,遍歷一個集合最傳統的做法大概是用 Iterator,或者 for 迴圈。這種兩種方式都屬於外部迭代,然而外部迭代存在著一些問題。
-
開發者需要自己手寫迭代的邏輯,雖然大部分場景迭代邏輯都是每個元素遍歷一次。
-
如果存在像排序這樣的有狀態的中間操作,不得不進行多次迭代。
-
多次迭代會增加臨時變數,從而導致記憶體的浪費。
雖然 Java 5 引入的 foreach 解決了部分問題,但也引入了新的問題。
-
foreach 遍歷不能對元素進行賦值操作
-
遍歷的時候,只有當前被遍歷的元素可見,其他不可見
隨著大資料的興起,傳統的遍歷方式已經無法滿足開發者的需求。
就像小作坊發展到一定程度要變成大工廠才能滿足市場需求一樣。大工廠和小作坊除了規模變大、工人不多之外,最大的區別就是多了流水線。流水線可以將工人們更高效的組織起來,使得生產力有質的飛躍。
所以不安於現狀的開發者們想要開發一種更便捷,更實用的特性。
-
它可以像流水線一樣來處理資料
-
它應該相容常用的集合
-
它的編碼應該更簡潔
-
它應該具有更高的可讀性
-
它可以提供對資料集合的常規操作
-
它可以拼裝不同的操作
經過不懈的能力,Stream 就誕生了。加上 lambda 表示式的加成,簡直是如虎添翼。
你可以用 Stream 幹什麼?
下面以簡單的需求為例,看一下 Stream 的優勢:
從一列單詞中選出以字母a開頭的單詞,按字母排序後返回前3個。
外部迭代實現方式
List<String> list = Lists.newArrayList("are", "where", "advance", "anvato", "java", "abc");List < String > tempList = Lists . newArrayList
();
List < String > result = Lists . newArrayList();
for ( int i = 0 ; i < list . size (); i ++) { if ( list . get ( i ). startsWith ( "a" )) { tempList . add ( list . get ( i ));}
}
tempList . sort ( Comparator . naturalOrder());
result = tempList . subList ( 0 , 3);
result . forEach ( System . out :: println );stream實現方式
List<String> list = Lists.newArrayList("are", "where", "anvato", "java", "abc"); list.stream().filter(s -> s.startsWith("a")).sorted().limit(3) .collect(Collectors.toList()).forEach(System.out::println);
Stream 是怎麼實現的?
需要解決的問題:
-
如何定義流水線?
-
原料如何流入?
-
如何讓流水線上的工人將處理過的原料交給下一個工人?
-
流水線何時開始執行?
-
流水線何時結束執行?
總觀全域性
Stream 處理資料的過程可以類別成工廠的流水線。資料可以看做流水線上的原料,對資料的操作可以看做流水線上的工人對原料的操作。
事實上 Stream 只是一個介面,並沒有操作的預設實現。最主要的實現是 ReferencePipeline
,而 ReferencePipeline
繼承自 AbstractPipeline
, AbstractPipeline
實現了 BaseStream 介面並實現了它的方法。但 ReferencePipeline
仍然是一個抽象類,因為它並沒有實現所有的抽象方法,比如 AbstractPipeline
中的 opWrapSink
。 ReferencePipeline
內部定義了三個靜態內部類,分別是: Head, StatelessOp, StatefulOp
,但只有 Head 不再是抽象類。
流水線的結構有點像雙向連結串列,節點之間通過引用連線。節點可以分為三類,控制資料輸入的節點、操作資料的中間節點和控制資料輸出的節點。
ReferencePipeline 包含了控制資料流入的 Head ,中間操作 StatelessOp, StatefulOp
,終止操作 TerminalOp
。
Stream 常用的流操作包括:
-
中間操作(Intermediate Operations)
-
無狀態(Stateless)操作:每個資料的處理是獨立的,不會影響或依賴之前的資料。如
filter()
、flatMap()
、flatMapToDouble()
、flatMapToInt()
、flatMapToLong()
、map()
、mapToDouble()
、mapToInt()
、mapToLong()
、peek()
、unordered()
等 -
有狀態(Stateful)操作:處理時會記錄狀態,比如處理了幾個。後面元素的處理會依賴前面記錄的狀態,或者拿到所有元素才能繼續下去。如
distinct()
、sorted()
、sorted(comparator)
、limit()
、skip()
等 -
終止操作(Terminal Operations)
-
非短路操作:處理完所有資料才能得到結果。如
collect()
、count()
、forEach()
、forEachOrdered()
、max()
、min()
、reduce()
、toArray()
等。 -
短路(short-circuiting)操作:拿到符合預期的結果就會停下來,不一定會處理完所有資料。如
anyMatch()
、allMatch()
、noneMatch()
、findFirst()
、findAny()
等。
原始碼分析
瞭解了流水線的結構和定義,接下來我們基於上面的例子逐步看一下原始碼。
定義輸入源
stream()
是 Collection 中的 default 方法,實際上呼叫的是 StreamSupport.stream()
方法,返回的是 ReferencePipeline.Head
的例項。
ReferencePipeline.Head
的建構函式傳遞是 ArrayList 中實現的 spliterator 。常用的集合都實現了 Spliterator 介面以支援 Stream。可以這樣理解,Spliterator 定義了資料集合流入流水線的方式。
定義流水線節點
filter()
是 Stream 中定義的方法,在 ReferencePipeline
中實現,返回 StatelessOp
的例項。
可以看到 filter()
接收的引數是謂詞,可以用 lambda
表示式。 StatelessOp
的建構函式接收了 this
,也就是 ReferencePipeline.Head
例項的引用。並且實現了 AbstractPipeline
中定義的 opWrapSink
方法。
@Overridepublic final Stream < P_OUT > filter ( Predicate <? super P_OUT > predicate ) { Objects . requireNonNull ( predicate ); return new StatelessOp < P_OUT , P_OUT >( this , StreamShape . REFERENCE , StreamOpFlag . NOT_SIZED ) { @Override Sink < P_OUT > opWrapSink ( int flags , Sink < P_OUT > sink ) { return new Sink . ChainedReference < P_OUT , P_OUT >( sink ) { @Override public void begin ( long size ) { downstream . begin (- 1 ); } @Override public void accept ( P_OUT u ) { if ( predicate . test ( u )) downstream . accept ( u ); } }; }
};
} sorted()
和 limit()
的返回值和也都是 Stream 的實現類,並且都接收了 this
。不同的是 sorted()
返回的是 ReferencePipeline.StatefulOp 的子類 SortedOps.OfRef 的例項。 limit()
返回的 ReferencePipeline.StatefulOp 的例項。
現在可以粗略地看到,這些中間操作(不管是無狀態的 filter()
,還是有狀態的 sorted()
和 limit()
都只是返回了一個包含上一節點引用的中間節點。有點像 HashMap 中的反向單向連結串列。就這樣把一個個中間操作拼接到了控制資料流入的 Head 後面,但是並 沒有開始做任何資料處理的動作 。
這也就是 Stream 延時執行的特性原因之所在。
參見附錄I會發現 StatelessOp 和StatefulOp 初始化的時候還會將當前節點的引用傳遞給上一個節點。
previousStage.nextStage = this;
所以各個節點組成了一個雙向連結串列的結構。
組裝流水線
最後來看一下終止操作 .collect() 接收的是返回型別對應的 Collector。
此例中的 Collectors.toList() 是 Collectors 針對 ArrayList 的建立的 CollectorImpl 的例項。
@Override @SuppressWarnings("unchecked") public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { container = evaluate(ReduceOps.makeRef(collector));//1 } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); }
先忽略並行的情況,來看一下加註釋了1的程式碼:
-
ReduceOps.makeRef
接收此 Collector 返回了一個ReduceOp
(實現了TerminalOp
介面)的例項。 -
返回的
ReduceOp
例項又被傳遞給 AbstractPipeline 中的evaluate()
方法。 -
在
evaluate
中,呼叫了ReduceOp
例項的evaluateSequential
方法,並將上流水線上最後一個節點的引用和sourceSpliterator
傳遞進去。
@Override public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return helper.wrapAndCopyInto(makeSink(), spliterator).get(); }
-
然後呼叫
ReduceOp
例項的makeSink()
方法返回其makeRef()
方法內部類ReducingSink
的例項。 -
接著
ReducingSink
的例項作為引數和spliterator
一起傳入最後一個節點的wrapAndCopyInto()
方法,返回值是 Sink 。
啟動流水線
流水線組裝好了,現在就該啟動流水線了。這裡的核心方法是 wrapAndCopyInto
,根據方法名也能看出來這裡應該做了兩件事, wrapSink()
和 copyInto()
。
wrapSink()
將最後一個節點建立的 Sink 傳入,並且看到裡面有個 for 迴圈。參見附錄I可以發現
每個節點都記錄了上一節點的引用( previousStage
)和每一個節點的深度( depth
)。
所以這個 for 迴圈是從最後一個節點開始,到第二個節點結束。每一次迴圈都是將上一節點的 combinedFlags
和當前的 Sink 包起來生成一個新的 Sink 。這和前面拼接各個操作很類似,只不過拼接的是 Sink 的實現類的例項,方向相反。
(Head.combinedFlags, (StatelessOp.combinedFlags, (StatefulOp.combinedFlags,(StatefulOp.combinedFlags ,TerminalOp.sink)))
@Override @SuppressWarnings("unchecked") final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
copyInto()
終於到了要真正開始迭代的時候,這個方法接收兩個引數 Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator
。 wrappedSink
對應的是 Head
節點後面的第一個操作節點(它相當於這串 Sink 的頭), spliterator
對應著資料來源。
這個時候我們回過頭看一下 Sink 這個介面,它繼承自 Consumer 介面,又定義了 begin()
、 end()
、 cancellationRequested()
方法。Sink 直譯過來是水槽,如果把資料流比作水,那水槽就是水會流過的地方。 begin()
用於通知水槽的水要過來了,裡面會做一些準備工作,同樣 end()
是做一些收尾工作。 cancellationRequested()
是原來判斷是不是可以停下來了。Consumer 裡的 accept()
是消費資料的地方。
@Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown());//1 spliterator.forEachRemaining(wrappedSink);//2 wrappedSink.end();//3 } else { copyIntoWithCancel(wrappedSink, spliterator); } }
有了完整的水槽鏈,就可以讓水流進去了。 copyInto()
裡做了三個動作:
-
通知第一個水槽(Sink)水要來了,準備一下。
-
讓水流進水槽(Sink)裡。
-
通知第一個水槽(Sink)水流完了,該收尾了。
突然想到宋丹丹老師的要把大象放冰箱要幾步?
注:圖中藍色線表示資料實際的處理流程。
每一個 Sink 都有自己的職責,但具體表現各有不同。無狀態操作的 Sink 接收到通知或者資料,處理完了會馬上通知自己的 下游。有狀態操作的 Sink 則像有一個緩衝區一樣,它會等要處理的資料處理完了才開始通知下游,並將自己處理的結果傳遞給下游。
例如 sorted()
就是一個有狀態的操作,一般會有一個屬於自己的容器,用來記錄處自己理過的資料的狀態。 sorted()
是在執行 begin 的時候初始化這個容器,在執行 accept 的時候把資料放到容器中,最後在執行 end 方法時才正在開始排序。排序之後再將資料,採用同樣的方式依次傳遞給下游節點。
最後資料流到終止節點,終止節點將資料收集起來就結束了。
然後就沒有然後了, copyInto()
返回型別是 void
,沒有返回值。
wrapAndCopyInto()
返回了 TerminalOps
建立的 Sink,這時候它裡面已經包含了最終處理的結果。呼叫它的 get()
方法就獲得了最終的結果。
回顧
再來回顧一下整個過程。首先是將 Collection 轉化為 Stream,也就是流水線的頭。然後將各個中間操作節點像拼積木一樣拼接起來。每個中間操作節點都定義了自己對應的 Sink,並重寫了 makeSink()
方法用來返回自己的 Sink 例項。直到終止操作節點出現時才開始將 Sink 例項化並串起來。然後就是上面提到的那三步:通知、資料流入、結束。
本文介紹和分析了最常規的 stream 用法和實現,實際上 stream 還有很多高階用法,比如利用協程實現的並行流。感興趣的同學可以研究一下。當然既然是高階用法,用的時候一定要多注意。
參考
-
深入理解Java Stream 流水線(http://www.cnblogs.com/CarpenterLee/p/6637118.html)
-
Java8 Stream 原理深度解析(http://www.cnblogs.com/Dorae/p/7779246.html)
-
Java 8 Stream 探祕 (https://colobu.com/2014/11/18/Java-8-Stream/)
附錄I
以下是初始化 Head 節點和中間操作的實現。
/** * Constructor for the head of a stream pipeline. * * @param source {@code Spliterator} describing the stream source * @param sourceFlags the source flags for the stream source, described in * {@link StreamOpFlag} * @param parallel {@code true} if the pipeline is parallel */ //初始化Head節點的時候會執行 AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { this.previousStage = null; this.sourceSpliterator = source; this.sourceStage = this; this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; // The following is an optimization of: // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE); this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE; this.depth = 0; this.parallel = parallel; } /** * Constructor for appending an intermediate operation stage onto an * existing pipeline. * * @param previousStage the upstream pipeline stage * @param opFlags the operation flags for the new stage, described in * {@link StreamOpFlag} */ //初始化中間操作StatelessOp和StatefulOp的時候會執行 AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }
全文完
以下文章您可能也會感興趣:
-
OpenResty 不完全指南
-
從 ThreadLocal 的實現看雜湊演算法
我們正在招聘 Java 工程師,歡迎有興趣的同學投遞簡歷到 [email protected] 。
杏仁技術站
長按左側二維碼關注我們,這裡有一群熱血青年期待著與您相會。