1. 程式人生 > >我的android多執行緒程式設計之路(2)之RxJava Schedulers原始碼分析

我的android多執行緒程式設計之路(2)之RxJava Schedulers原始碼分析

寫在伊始

上一篇介紹了執行緒的一些基礎知識和工作這麼久以後對於多執行緒部分的使用經驗之路,這篇主要對RxJava執行緒控制部分進行分析。

RxJava(本文就RxJava2.0分析)

說實話,近一年多一直在用rxjava進行專案架構的編寫及封裝及一些非同步請求的處理等等。真的很好用,但本文只對其執行緒部分進行分析。如果你想學習rxjava的話,推薦您看一下如下幾篇文件,也是一點一點學過來的,希望可以幫到您。

RxJava之Scheduler (執行緒池)(附帶原始碼分析)

網上找的對於Scheduler 的2.0介紹都是寫的扔物線大神對於Scheduler在RxJava1.0中的介紹,所以對於新操作符single()是自己的理解什麼的,不對的還請指出)。

在RxJava 中,Scheduler ——排程器,相當於執行緒控制器,RxJava 通過它來指定每一段程式碼應該執行在什麼樣的執行緒。RxJava 已經內建了幾個 Scheduler ,它們已經適合大多數的使用場景,他們分別為:

  • Schedulers.io():最常用的Scheduler ,主要用於IO 操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在於 io() 的內部實現是是用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的執行緒。
  • Schedulers.computation():計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
  • Schedulers.newThread():總是啟用新執行緒,並在新執行緒執行操作。
  • Schedulers.trampoline(),直接在當前執行緒執行,相當於不指定執行緒。這是預設的 Scheduler。代替RxJava1.0的immediate 排程器。
  • Schedulers.single(): (我沒使用過)擁有一個執行緒單例,所有的任務都在這一個執行緒中執行,當此執行緒中有任務執行時,其他任務將會按照先進先出的順序依次執行。
  • Schedulers.from(executor),接收一個Executor,允許我們自定義Scheduler。
  • 另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主執行緒執行。(也就是onNext方法的執行執行緒)

有了這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對執行緒進行控制了。subscribeOn(): 指定 subscribe() 所發生的執行緒,即 Observable.OnSubscribe 被啟用時所處的執行緒。或者叫做事件產生的執行緒。 observeOn(): 指定 Subscriber 所執行在的執行緒。或者叫做事件消費的執行緒。

//例如
DisposableSubscriber sub = RetorfitUtil.getInstance().create().getData()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                //型別轉換   變成你想要的型別
                .onBackpressureBuffer()
                ...

下文會對這些Scheduler進行單獨的介紹。但是在這之前,先看一下Schedulers這個類。

Schedulers分析(如果有理解的不對的,還請多多指出)

話不多說,請擼這段原始碼。在subscribeOn這個方法中,無論我們使用哪一個Scheduler,都會首先走進這個類中。我們發現,在一開始的時候,他用一個靜態程式碼塊,初始化了五個Scheduler 供我們使用。

    static final class SingleHolder {
        static final Scheduler DEFAULT = new SingleScheduler();
    }

    ....
    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }

    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return SingleHolder.DEFAULT;
            }
        });

        ....

        IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return IoHolder.DEFAULT;
            }
        });

        TRAMPOLINE = TrampolineScheduler.instance();//預設Scheduler 
        ....
    }

一旦我們呼叫這些方法,他就會把我們的任務放進對應的Scheduler (執行緒池)中進行執行。這也就是執行緒切換的概念。



    ...

    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
    ...

    public static Scheduler single() {
        return RxJavaPlugins.onSingleScheduler(SINGLE);
    }

那麼他在new這些Scheduler的時候又做了些什麼呢?這些執行緒池又是如何的建立的呢?一個一個跟進原始碼看一看我們。

Schedulers.io():

之前說這個Scheduler(執行緒池)的內部實現是是用一個無數量上限的執行緒池,可以重用空閒的執行緒,那麼他是如何做的呢?

首先,他們所有的Thread都是由RxThreadFactory建立,通過初始化獨特的屬於自己的字串常量來確保不同的Scheduler在建立不同的執行緒的標誌性。如下(其餘同,就不介紹此部分了):

這裡寫圖片描述

再者發現一個有意思的事情,rxjava所有的執行緒都被設定為守護執行緒。如下圖:也就是它的執行緒不會影響JVM的退出,關於守護執行緒和使用者執行緒不明白的,請自行百度。

這裡寫圖片描述

之前在使用的過程中,一直以為無數量上限的執行緒池的底層是newCachedThreadPool實現的,後來發現不然,在IoScheduler的CachedWorkerPool類中你會發現,它建立了一個建立一個定長執行緒池,並且大小是1.

這裡寫圖片描述

那麼無數量限制呢?懵逼了!!後來發現,Schedulers.io()所用的執行緒池(到這裡不對了,準確的來說,應該叫的執行緒組,因為他是由一個一個的定長執行緒池組成),是一個由ConcurrentLinkedQueue(頂層父類其實是Collection)組成的多個CachedWorkerPool,通過CachedWorkerPool的get()方法每次從Queue中獲取可用worker執行緒,來進行任務的操作。如下圖:

這裡寫圖片描述

當然,如果expiringWorkerQueue沒有worker執行緒,則會單獨為此執行緒建立一個worker執行緒

這裡寫圖片描述

Schedulers.newThread()

Schedulers.newThread(): 總是啟用新執行緒,並在新執行緒執行操作。那麼,在NewThreadScheduler它是怎麼做的呢?如下圖:圖中第一個和第二個箭頭說明不同的Scheduler在建立不同的執行緒的標誌性。第三個箭頭說明他每次建立一個執行緒就把這個執行緒初始化為worker執行緒,並執行操作。

這裡寫圖片描述

這裡寫圖片描述

Schedulers.computation()

計算所使用的 Scheduler。這個執行緒池由newFixedThreadPool直接實現。這個計算指的是 CPU 密集型計算。那為什麼這個執行緒池適合執行Cpu密集型計算呢,因為它的個數等於CPU 核數,能夠最大效率的使用CPU,提高效率。如圖:

獲取CPU核數:

這裡寫圖片描述

建立固定大小的執行緒池:

這裡寫圖片描述

Schedulers.computation()模式下是用RoundRobin(是一種演算法,請自行百度)方式輪訓獲取worker執行緒,這就是為什麼起名叫叫EventLoop吧,如下圖。

這裡寫圖片描述

在EventLoopWorker中,會呼叫schedule方法執行執行緒中的任務。

這裡寫圖片描述

Schedulers.trampoline():(預設使用的執行緒組)

Schedulers.trampoline()的意思是直接在當前執行緒執行,相當於不指定執行緒。這是預設的 Scheduler,代替RxJava1.0的immediate 排程器。Schedulers.trampoline()所用的執行緒組,組合了一個PriorityBlockingQueue,以提交事件的時間進行排序,依次執行任務,如下圖。

這裡寫圖片描述

這個單詞你懂了就懂了。enqueue,不信你去查查!

這裡寫圖片描述

Schedulers.single()

Schedulers.single(): 擁有一個執行緒單例,所有的任務都在這一個執行緒中執行,當此執行緒中有任務執行時,其他任務將會按照先進先出的順序依次執行。他只有一個worker執行緒,如圖:

這裡寫圖片描述

在ScheduledWorker中,會執行我們所有的任務。

這裡寫圖片描述

在CompositeDisposable中,有一個OpenHashSet回來儲存我們的任務,在add方法中,通過同步程式碼塊,保證同一時間只有一個任務在執行。

這裡寫圖片描述

Schedulers.from(executor):

接收一個Executor,允許我們自定義Scheduler。關於這方面,先不準備講解,使用較少,如有需要請告知,我可以一起和你百度,共同實踐哈哈!

問題求解

還有一個任務,就是在RxJavaPlugins這個類中,初始化了很多的handler,是為了Funtion中回撥資料嗎?還是什麼?沒找到相關的地方,求幫助!

寫在最後

還是那句話,希望得到大家的中肯的意見,讓我認識自己的不足,一起學習,共同進步。這一篇就先到這裡,分析的過程中,發現寫的真是無法企及的高度。自殘中……