1. 程式人生 > >Java8流Stream中間操作、終止操作執行流程原始碼分析

Java8流Stream中間操作、終止操作執行流程原始碼分析

通過前面的部落格的介紹,我們知道Stream有一個源,0個或者多箇中間操作,以及一個終止操作。Stream只有遇到終止操作,它的源才開始執行遍歷操作,而且只會進行一次遍歷,而不是每個操作都執行一次遍歷。今天,我們就從原始碼的層面來分析一下JDK這一塊是怎麼實現的。

首先看下面一段程式碼,下面將以這一段程式碼來進行分析:

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
list.stream().filter(i -> i % 2 == 0)
        .limit(3)
        .map(String::valueOf)
        .forEach(System.out::println);

上面這段程式碼,中間操作,短路操作,終止操作全部都有了。

首先list.stream()會返回一個Stream物件。我們可以跟進去,看看返回的到底是個什麼物件。

default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
}

繼續跟到StreamSupport.stream:

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return
new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }

可以看到返回的是一個ReferencePipeline.Head的例項。再跟進去,可以發現Head是ReferencePipeline類的一個靜態內部類,並且Head還繼承了ReferencePipeline,JDK註釋說它表示的是ReferencePipeline的一個源階段。

再來看看ReferencePipeline這個類。它是一個抽象類,繼承了AbstractPipeline這個類並且實現了Stream介面。我們將上面的操作跟進去看實現,會發現filter,limit,map,forEach這些方法都是在這個類中實現的。也就是說ReferencePipeline這個類提供了大量的對流的操作的實現。

繼續分析ReferencePipeline這個類,它的父類AbstractPipeline中定義了三個個AbstractPipeline型別的變數:sourceStage(源階段),previousStage(上游pipeline,前一階段),nextStage(下一階段)。根據JDK文件的對著三個屬性的說明可以知道:ReferencePipeline實際上是一個雙向連結串列的資料結構。而ReferencePipeline對Stream的操作做了實現,每一箇中間操作都會返回一個Stream物件,實際上就是ReferencePipeline物件,因此可以得到結論:Stream底層是通過雙向連結串列來實現的。

經過上面的分析,我們就知道了流的源階段返回的Stream和中間階段返回的Stream到底是什麼了。源階段返回的是ReferencePipeline.Head物件,而中間操作階段返回的是ReferencePipeline物件。在流的源階段和中間階段僅僅只是返回了ReferencePipeline物件,並沒有做其他的方法呼叫操作,這也是為什麼流在執行中間操作並不會有任何的輸出或者結果產生。

那麼這些中間操作在執行終止操作的時候是怎麼被呼叫的呢?還有流的短路是怎麼實現的呢?

接著上面,跟進filter方法的實現:

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

可以看到僅僅只是new一個StatelessOp物件進行返回。跟進去StatelessOp發現它也是ReferencePipeline的一個靜態內部類,並且也繼承了ReferencePipeline,也印證了上面說的中間操作返回的都是ReferencePipeline物件。上面的程式碼還可以看到,StatelessOp物件對opWrapSink這個方法做了實現:返回一個Sink.ChainedReference物件。Sink.ChainedReference又是一個靜態內部類,並且繼承了Sink。並且Sink.ChainedReference中有一個Sink型別的屬性downstream,可以知道:Sink.ChainedReference是一個單鏈表的資料結構。

跟到父類裡面檢視JDK對於opWrapSink這個方法的說明:接受一個用於接受操作結果的Sink物件並且返回一個接受當前操作輸入型別元素的Sink。有點迷惑,檢視一下這個Sink到底是什麼。跟進去看JDK說明:Sink是Cosumer的一個子類,用來引導值來通過流管道的各個階段,通過提供一些額外的方法實現來管理大小資訊、控制流動等等。可以看到裡面提供了一個accept方法,引數接受的是流中的值,用來定義一種行為,這個行為也就是流的操作。因此可以總結出的是:對於流的各種操作,底層其實是被封裝成一個個的Sink(對於引用型別其實是Sink.ChainedReference)物件來進行操作的。而Sink.ChainedReference又是一種單鏈表的資料結構,所以,流中的操作會以單鏈表的形式被連結起來。而且檢視各種中間操作的實現,可以發現opWrapSink這個方法返回的Sink.ChainedReference物件的accept方法實現中,都會呼叫下游Sink的accept方法。這無疑就是行為的一種串聯。這也就解釋了,流在遇到終止操作過後,是怎麼實現的遍歷的時候是每個元素經歷一連串的操作,然後再遍歷下一個元素。

最後在分析一下流的終止操作是什麼樣的以及流的短路是怎麼實現的。

最後的forEach方法跟進去,這裡肯定進入的是ReferencePipeline的forEach方法,因為Head是源的Pipeline。

public void forEach(Consumer<? super P_OUT> action) {
    evaluate(ForEachOps.makeRef(action, false));
}

看下ForEachOps.makeRef的實現:

public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action, boolean ordered) {
        Objects.requireNonNull(action);
        return new ForEachOp.OfRef<>(action, ordered);
}

看到最終返回一個ForEachOp.OfRef物件,跟進去發現ForEachOp.OfRef是ForEachOp的一個靜態內部類並且繼承了ForEachOp。並且可以檢視到ForEachOp這個類有一個祖先是Sink。那說明ForEachOps.makeRef這個方法的作用就是把forEach操作封裝成Sink物件。

在檢視evaluate方法實現:

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel() 
            ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) 
            : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

在跟到序列流的實現:

public <S> Void evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
    return helper.wrapAndCopyInto(this, spliterator).get();
}

這裡的第一個引數PipelineHelper型別其實是AbstractPipeline的父類,而AbstractPipeline又是ReferencePipeline的父類。再跟進helper.wrapAndCopyInto方法:

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);

    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

首先看wrapSink方法,可以看到通過ReferencePipeline的雙向連結串列,從最後一個操作(也就是終止操作)往前遍歷,將所有的操作都串聯起來,最終返回一個指向第一個操作的Sink引用。這裡的p.opWrapSink呼叫的其實就是每個操作返回的ReferencePipeline物件的opWrapSink方法。前面對這個方法做過了分析。

再來分析copyInto方法。可以看到邏輯分為短路操作和非短路操作,如果有短路操作就會執行下面的copyIntoWithCancel方法,否則指向上面的邏輯,最終會對封裝好的sink執行accept,而每一個sink的accept方法裡優惠呼叫下游sink的accept方法,來實現操作串聯。

最後分析下短路:

final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        @SuppressWarnings({"rawtypes","unchecked"})
        AbstractPipeline p = AbstractPipeline.this;
        while (p.depth > 0) {
            p = p.previousStage;
        }
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        p.forEachWithCancel(spliterator, wrappedSink);
        wrappedSink.end();
    }
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
        do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
    }

可以看到短路流在執行遍歷的時候會呼叫Sink封裝的cancellationRequested方法,如果返回出就不會進行後面的操作。也就是說短路流在操作的時候都要重寫這個方法,例如limit操作,肯定是對傳入的引數m進行自減– 操作,等到m<=0,cancellationRequested這個方法肯定會返回true。
可以跟進去看下limit操作的實現:

public final Stream<P_OUT> limit(long maxSize) {
        if (maxSize < 0)
            throw new IllegalArgumentException(Long.toString(maxSize));
        return SliceOps.makeRef(this, 0, maxSize);
    }

下面是SliceOps.makeRef返回的ReferencePipeline.StatefulOp物件對於opWrapSink方法的實現:

Sink<T> opWrapSink(int flags, Sink<T> sink) {
    return new Sink.ChainedReference<T, T>(sink) {
        long n = skip;
        long m = limit >= 0 ? limit : Long.MAX_VALUE;

        @Override
        public void begin(long size) {
            downstream.begin(calcSize(size, skip, m));
        }

        @Override
        public void accept(T t) {
            if (n == 0) {
                if (m > 0) {
                    m--;
                    downstream.accept(t);
                }
             }
             else {
                 n--;
             }
        }

        @Override
        public boolean cancellationRequested() {
            return m == 0 || downstream.cancellationRequested();
        }
    };
}