1. 程式人生 > >RxJava2.0中flatMap操作符用法和原始碼分析(五)

RxJava2.0中flatMap操作符用法和原始碼分析(五)

flatMap基本使用

flatMap是變換操作符,使用一個指定的函式對原始Observable發射的每一項資料執行變換操作,這個函式返回一個本身也發射資料的Observable,然後flatMap合併這些Observable發射的資料,最後將合併後的結果當作它自己的資料序列發射。

注意:flatMap對這些Observable發射的資料做的是合併(merge)操作,因此它們可能是交錯的。

image

我們可以用程式碼示例說明:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public
void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws
Exception { String flatMap = "I am value " + integer; return Observable.just(flatMap); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { println("flatMap : accept : " + s + "\n"); } }); 輸出結果: flatMap : accept : I am value 1
flatMap : accept : I am value 2 flatMap : accept : I am value 3

下面我們將從原始碼的角度來分析下:

這裡我們首先使用create操作符建立一個Observable,並且發射指定的資料。關於create如何建立Observable物件,我們這裡不做分析,前面文章中已經說明。我們直接分析flatMap的原始碼:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
    return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
    return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
        boolean delayErrors, int maxConcurrency, int bufferSize) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

flatMap方法具有多個過載方法,在呼叫flatMap方法時,首先需要傳入一個Function的物件,但對是Function類中的泛型引數有要求,第二個引數必須是繼承自ObservableSource類,然後在抽象的apply方法中返回該型別引數。而我們都知道Observable就是繼承自該類。所以在Function中,最後必須返回的是一個Observable的物件。

我們知道在執行操作符方法時,最後都會生成一個具體的Observable物件,這個物件作為Observable的具體實現類,裡面實現了具體的業務邏輯,然後通過多型的方式進行呼叫,而flatMap中對應的就是ObservableFlatMap類。

public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final boolean delayErrors;
    final int maxConcurrency;
    final int bufferSize;

    public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }

    .......

}

ObservableFlatMap的構造方法中,傳入了五個引數。如下:

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
        boolean delayErrors, int maxConcurrency, int bufferSize) {
    ......
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

其中this引數,代表的是當前建立的Observable物件,本例子中使用的是create操作符建立的物件,所以具體的物件就是ObservableCreate例項。
mapper就是Function中的第二個引數,是一個新的Observable物件。其他引數都是系統預設引數。

完成了被觀察者的準備工作後,接下來就是訂閱觀察者了。從之前的分析我們知道在訂閱觀察者時,實際呼叫的都是subscribeActual的方法。那麼現在我們具體來分析ObservableFlatMap類中的方法:
ObservableFlatMap#subscribeActual

@Override
public void subscribeActual(Observer<? super U> t) {

    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }

    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

裡面再次呼叫了source.subscribe方法,而我們剛才分析了這個source就是this這個引數指代的物件,這裡即ObservableCreate物件。

Observable#subscribe

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

引數observer就是上面新建的MergeObserver物件。由於subscribeActual是個抽象方法,這裡執行的是ObservableCreate中的方法。我們在看一下該類中的方法:

ObservableCreate#subscribeActual

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

這裡面的通過呼叫ObservableOnSubscribe類中的subscribe方法,開發發射資料。但是引數observer就是剛才的MergeObserver物件。
在通過onNext發射資料時,其實執行的就是MergeObserver中的onNext的物件。

@Override
public void onNext(T t) {
    // safeguard against misbehaving sources
    if (done) {
        return;
    }
    ObservableSource<? extends U> p;
    try {
        p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        s.dispose();
        onError(e);
        return;
    }

    if (maxConcurrency != Integer.MAX_VALUE) {
        synchronized (this) {
            if (wip == maxConcurrency) {
                sources.offer(p);
                return;
            }
            wip++;
        }
    }

    subscribeInner(p);
}

方法中,首先接收了發射的資料,然後在通過mapper.apply(t)將資料轉化為具有發射資料的ObservableSource物件,這裡就對應了之前所說的Function將發射的資料轉換為ObservableSource的物件。然後將新生成的ObservableSource的資料發射,然後觀察者接收新發射的資料。這裡面程式碼很多邏輯比較複雜就不再一一分析。