1. 程式人生 > >ThreadPoolExecutor詳細介紹

ThreadPoolExecutor詳細介紹

在之前的文章中總結了Java執行緒的兩種建立方式:繼承Thread和實現Runnable介面,在Java中使用多執行緒不一定非得用此兩種方式,JDK為我們封裝了大量的執行緒實用類,本文主要對Java中的執行緒池ThreadPoolExecutor做一下詳細的介紹。

執行緒池的好處

使用執行緒池比通過Thread或者Runnable直接實現多執行緒有哪些好處呢?

1.減少系統資源消耗:執行緒池中執行緒能夠被複用,不需要每次建立一個執行緒執行任務,執行完任務之後進行執行緒銷燬,減少執行緒建立與執行緒銷燬帶來的系統開銷以及時間片輪轉和上下文切換帶來的開銷;

2.提高執行任務的響應速度

:將任務提交到執行緒池中,一旦執行緒池中有空閒執行緒,可以直接呼叫執行緒執行任務,節省執行緒建立所帶來的時間;

3.執行緒的統一排程管理:能夠對執行緒池中的執行緒都進行統一的管理、排程以及銷燬;

ThreadPoolExecutor

JDK提供了ThreadPoolExecutor執行緒池,ExecutorService的各種執行緒池策略都是基於ThreadPoolExecutor實現的,因此有必要弄清楚ThreadPoolExecutor;當我們想執行緒池提交一個任務時,需要經過什麼樣的步驟?有什麼不同的處理結果呢?

執行緒池組成部分執行緒(核心執行緒與非核心執行緒)、任務佇列

(儲存處於等待的任務)、執行緒工廠(用於建立執行緒)、異常處理handler非核心執行緒空閒超時時間等部分組成;

執行緒池的狀態
1.RUNNING:接受新的任務,執行任務佇列裡面的任務;
2.SHUDOWN:不再接受新的任務,但會執行任務佇列裡面的任務;
3.STOP:不再接受新的任務,也不執行任務佇列裡面的任務;同時中斷正在執行的任務;
4.TIDYING:所有的任務已經完成,工作執行緒為0;TIDYING會呼叫terminated()進入TERMINATED狀態;
5.TERMINATED:terminated()函式執行完成,執行緒池終止;

執行緒池狀態間的轉換
RUNNING -> SHUTDOWN

:呼叫shutdown()函式
RUNNING -> STOP:呼叫shutdownNow()函式
SHUTDOWN -> STOP:呼叫shutdownNow()函式
SHUTDOWN -> TIDYING:任務佇列和工作執行緒都為空(等待任務佇列裡面的任務執行完成,工作執行緒終止)
STOP -> TIDYING:工作執行緒都為空(當進入STOP狀態後中斷所有工作執行緒進入TIDYING)
TIDYING -> TERMINATED:呼叫terminated()函式執行完成(進入TIDYING狀態會呼叫terminated()函式)

往執行緒池中提交一個任務,要經過一下步驟:

步驟1:如果正在執行的執行緒數量小於 corePoolSize,那麼馬上建立執行緒執行這個任務,而不會把這個任務新增到任務佇列中;

步驟2:如果正在執行的執行緒數量大於或等於 corePoolSize,那麼將這個任務插入任務佇列中,不會馬上執行該任務,等待執行的執行緒執行完當前的任務空閒後,再從任務佇列中取出任務繼續執行。

步驟3:如果這時候任務佇列滿了,無法繼續插入到任務佇列,如果此時正在執行的執行緒數量小於 maximumPoolSize,那麼還是會建立一個執行緒來執行這個任務;

步驟4:如果任務佇列滿了,並且正在執行的執行緒數量大於或等於 maximumPoolSize,那麼執行緒池會丟擲異常,告訴呼叫者“我不能再接受任務了”,通過異常處理handler進行處理。

當然上述步驟不是完全的步驟,當在執行過程中發現執行緒池的狀態為非RUNNING狀態時,則丟擲異常,拒絕此任務,交給異常處理handler進行處理;

任務執行策略

根據上面的介紹我們可以知道,當執行緒池正在執行的執行緒大於或者等於corePoolSize時,任務佇列的大小直接決定任務的執行;任務佇列的大小可以分為三種:size為0的佇列有界佇列以及無界佇列

1.size為0的佇列

任務佇列大小為0,每次往任務佇列中插任務時都失敗,此時正在執行的執行緒小於maximumPoolSize時,則會建立新的執行緒直接執行任務,或者由空閒執行緒執行任務;否則會拒絕接受此任務,交給異常處理handler進行處理。

執行緒池的特點
1.執行緒池執行緒數量最多為maximumPoolSize個,最少為0個
2.提交執行緒池的任務不需要等待,當前執行緒小於corePoolSize時,直接建立新的執行緒執行;如果當前執行的執行緒數量等於或者大於maximumPoolSize時,則直接拒絕接受此任務,交給異常處理handler進行處理;
3.執行緒池最多同時執行maximumPoolSize個任務

下面我們來看一個例子

建立一個任務佇列大小為0的ThreadPoolExecutor,其中corePoolSize為1,maximumPoolSize為2

 public static ExecutorService synchronousQueueExecutor() {
    return new ThreadPoolExecutor(1, 2,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
}

建立3個任務Runnable,在開始執行與結束執行run()列印log,中間sleep() 1秒

    private static Runnable runnable1 = new Runnable() {
        @Override
        public void run() {
            Log.i("executor","runnable1 start");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Log.i("executor","runnable1 end");

        }
    };

    private static Runnable runnable2 = new Runnable() {
        @Override
        public void run() {
            Log.i("executor","runnable2 start");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Log.i("executor","runnable2 end");

        }
    };

    private static Runnable runnable3 = new Runnable() {

        public String getName() {
            return "runnable3";
        }

        @Override
        public void run() {
            Log.i("executor","runnable3 start");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Log.i("executor","runnable3 end");

        }
    };

設定RejectedExecutionHandler,並執行上面的3個任務

 private static RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            Log.i("executor","rejectedExecution");
        }
    };

    public static void testSynchronousQueueExecutor () {

        ExecutorService executor = ExecutorUtils.synchronousQueueExecutor();
        ((ThreadPoolExecutor)executor).setRejectedExecutionHandler(handler);
        executor.execute(runnable1);
        executor.execute(runnable2);
        executor.execute(runnable3);
    }

因為執行緒池的maximumPoolSize為2,任務佇列大小為0,當提交到第2個任務的時候,會拒絕接受任務,丟擲rejectedExecution,交給handler處理,我們來執行看看結果如何

 I/executor: rejectedExecution
 I/executor: runnable2 start
 I/executor: runnable1 start
 I/executor: runnable2 end
 I/executor: runnable1 end

從結果可以知道,runnable3沒有執行,並且丟擲了rejectedExecution,在handler中進行處理。

2.有界佇列
有界佇列,當正在執行的執行緒數量等於或者大於corePoolSize,每次提交任務時,都會往任務佇列中插入,若往任務佇列中插任務時失敗,此時正在執行的執行緒小於maximumPoolSize時,則會建立新的執行緒直接執行任務,或者由空閒執行緒執行任務;否則會拒絕接受此任務,交給異常處理handler進行處理。

執行緒池特點
1.執行緒池執行緒數量最多為maximumPoolSize個,最少為0個
2.提交執行緒池的任務可能需要等待,當前執行緒小於corePoolSize時,直接建立新的執行緒執行;如果當前執行的執行緒數量等於或者大於corePoolSize時,會插入任務佇列進行等待,當任務佇列滿了且當前正在執行的執行緒數量小於maximumPoolSize個,會建立新的執行緒直接執行,否則直接拒絕接受此任務,交給異常處理handler進行處理;
3.執行緒池最多同時執行 任務佇列大小 + maximumPoolSize 個任務

下面我們來看一個例子

首先建立一個有界任務佇列的執行緒池,任務佇列大小為3

 public static ExecutorService boundaryQueueExecutor() {
     return new ThreadPoolExecutor(1, 2,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(3));
 }

建立10個runnable物件

    private static Runnable runnable1 = new Runnable() {
        @Override
        public void run() {
            Log.i("executor","runnable1 start");
        }
    };

    private static Runnable runnable2 = new Runnable() {
        @Override
        public void run() {
            Log.i("executor","runnable2 start");
        }
    };

    private static Runnable runnable3 = new Runnable() {

        @Override
        public void run() {
            Log.i("executor","runnable3 start");
        }
    };
    private static Runnable runnable4 = new Runnable() {

        @Override
        public void run() {
            Log.i("executor","runnable4 start");
        }
    };
    private static Runnable runnable5 = new Runnable() {

        @Override
        public void run() {
            Log.i("executor","runnable5 start");
        }
    };
    private static Runnable runnable6 = new Runnable() {

        @Override
        public void run() {
            Log.i("executor","runnable6 start");
        }
    };
    private static Runnable runnable7 = new Runnable() {

        @Override
        public void run() {
            Log.i("executor","runnable7 start");
        }
    };
    private static Runnable runnable8 = new Runnable() {

        @Override
        public void run() {
            Log.i("executor","runnable8 start");  
        }
    };
    private static Runnable runnable9 = new Runnable() {

        @Override
        public void run() {
            Log.i("executor","runnable9 start");
           }
    };
    private static Runnable runnable10 = new Runnable() {

        @Override
        public void run() {
            Log.i("executor","runnable10 start");
        }
    };

設定RejectedExecutionHandler,並執行上面的10個任務

 public static void testBoundaryQueueExecutor () {

        ExecutorService executor = ExecutorUtils.boundaryQueueExecutor();
        ((ThreadPoolExecutor)executor).setRejectedExecutionHandler(handler);
        executor.execute(runnable1);
        executor.execute(runnable2);
        executor.execute(runnable3);
        executor.execute(runnable4);
        executor.execute(runnable5);
        executor.execute(runnable6);
        executor.execute(runnable7);
        executor.execute(runnable8);
        executor.execute(runnable9);
        executor.execute(runnable10);
    }

因為我們的建立的執行緒池corePoolSize為1,maximumPoolSize為2,任務佇列大小為3,因此執行緒池最多隻能執行5個任務,有5個任務將會被拒絕,我們來看看執行結果如何

I/executor: rejectedExecution
I/executor: rejectedExecution
I/executor: rejectedExecution
I/executor: rejectedExecution
I/executor: rejectedExecution
I/executor: runnable5 start
I/executor: runnable2 start
I/executor: runnable3 start
I/executor: runnable4 start
I/executor: runnable1 start

說明結果還是和我們預期相符的

但是這裡有一點要注意的,執行的任務的順序是不一定的,有些部落格上說因為中間的任務加入佇列裡面等待而後面的任務因為任務佇列滿了執行建立執行緒執行,認為後面的任務比佇列中的任務先執行,這種說法是錯誤的;有兩點原因:1.後面的任務不一定比在佇列中的任務先建立執行緒,也有可能在為後面的任務建立執行緒的過程中,有正在執行的執行緒空閒下來了,執行佇列中的任務;2.即使後面的任務先建立執行緒,也不一定先執行,因為到底哪個執行緒先獲得CPU時間片進行執行也是不可確定的。

3.無界佇列
無界佇列,當正在執行的執行緒數量等於或者大於corePoolSize,每次提交任務時,都會往任務佇列中插入,因為任務佇列無限大,因此不會失敗。

執行緒池特點
1.執行緒池執行緒的最大數量為corePoolSize
2.執行緒池任務佇列無線大,線上程池RUNNING狀態下不會拒絕接受任務,丟擲rejectedExecution
3.當執行緒池的正在執行的執行緒數量等於corePoolSize時,任務都會插入到任務佇列

具體就不舉例子了,有興趣的可以自己去試試

Executors

JDK提供了Executors類,其中提供了建立ThreadPoolExecutor的靜態工廠方法,主要包含三類執行緒池的建立:newFixedThreadPool、newSingleThreadExecutor和newCachedThreadPool。

newFixedThreadPool

建立只有核心執行緒的執行緒池,其中任務佇列是無界佇列

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newSingleThreadExecutor

建立只有一個核心執行緒的執行緒池,其中任務佇列為無界佇列,相當於是單執行緒執行任務佇列中的任務

  public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newCachedThreadPool

建立沒有核心執行緒池、非核心執行緒池最多為Integer.MAX_VALUE個、任務佇列大小為0的執行緒池,相當於每加入一個任務建立一個執行緒執行任務,執行緒超時後會銷燬

   public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }