少年,你可能對RxJava的Zip操作有些誤會
閱讀時間 5min

image.png
專案中有這樣一個場景,資訊詳情頁展示需要傳送兩個網路請求,一個獲取資訊資訊,一個獲取評論資訊,只有兩部分內容都請求都完成,才能進行頁面的展示,這是一個非常常見的業務場景,我們直接使用了RxJava的Zip操作符來實現,虛擬碼如下:
ApiService service; // Retrofit 產生的請求介面 Observable.zip(service.getNewsContent() ,service.getNewsComments() ,new Func() {}) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe((data) -> { //展示資訊和評論 })
然而我今天在除錯另外一個問題時,當把網路延遲設定成5s時,明顯發現 getNewsContent
和 getNewsComments
請求居然是序列的,這可和我們對Zip的主觀印象不符呀~

image.png
通常來說這種業務場景裡面兩個網路請求可以併發來達到更快的展示效果,那這裡Zip操作符裡面的兩個請求為什麼會序列呢?看了一眼註釋恍然大悟,不是Zip有毛病,是之前使用這個操作符號的少年姿勢有些不對,不知道聰明又伶俐的你發現問題了沒?
是的,就是排程器的問題!我們來看看Zip操作符的程式碼:
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) { return just(new Observable<?>[] { o1, o2 }).lift(new OperatorZip<R>(zipFunction)); }
很簡單,直接把傳入的兩個Observable合成一個數組,通過just發出來,然後追加一個lift操作,所以重點就是這個lift操作中的OperatorZip怎麼處理了。
如果你研究過lift操作,自然知道lift接受一個Operator介面,通過Operator介面的call方法,產生一個Subscriber去訂閱lift左邊的Observable,在這個例子裡面就是 just(new Observable<?>[] { o1, o2 })
。
public Subscriber<? super Observable[]> call(final Subscriber<? super R> child) { //... final ZipSubscriber subscriber = new ZipSubscriber(child, zipper, producer); return subscriber; }
程式碼也很簡單,直接new了一個ZipSubscriber:
private final class ZipSubscriber extends Subscriber<Observable[]> { final Zip<R> zipper; @Override public void onNext(Observable[] observables) { zipper.start(observables, producer); } }
到此,你可以再捋一下了,前面zip傳入的兩個網路請求,getNewsContent和getNewsComments被just打包成一個新流,被ZipSubscriber接收,那麼在它的onNext中它通過 zipper.start(observables, producer);來執行兩個原始網路請求的,看它是怎麼執行的吧:
public void start(Observable[] os, AtomicLong requested) { for (int i = 0; i < os.length; i++) { os[i].unsafeSubscribe((InnerSubscriber) subscribers[i]); } }
是的,你沒看錯,直接在一個for裡面,通過unsafeSubscribe訂閱來觸發網路請求了,所以就會有上面的結論,兩個網路請求串行了,要解決這個問題也很簡單,你只需要給每個流追加一個排程器,讓它們自己併發就可以啦~
ApiService service; // Retrofit 產生的請求介面 Observable.zip(service.getNewsContent().subscribeOn(Schedulers.io()) ,service.getNewsComments().subscribeOn(Schedulers.io()) ,new Func() {}) .observeOn(AndroidSchedulers.mainThread()) .subscribe((data) -> { //展示資訊和評論 })
少年,這個誤會解釋清楚了嗎?