1. 程式人生 > >Java執行緒池實現原理與技術II

Java執行緒池實現原理與技術II

為了能夠更好地控制多執行緒,JDK提供了一套Executor框架,幫助開發人員有效地進行執行緒控制。Executor框架無論是newFixedThreadPool()方法、newSingleThreadExecutor()方法還是ewCachedThreadPool()方法,其內部實現均使用了 ThreadPoolExecutor:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

由以上執行緒池的實現程式碼可以知道,它們只是對 ThreadPoolExecutor 類的封裝。為何 ThreadPoolExecutor 類有如此強大的功能?來看一下 ThreadPoolExecutor 最重要的構造方法。

1、構造方法

ThreadPoolExecutor最重要的構造方法如下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

方法引數如下:

引數 說明
corePoolSize 指定了執行緒池中的執行緒數量
maximumPoolSize 指定了執行緒池中最大的執行緒數量
keepAliveTime 當執行緒池執行緒數量超過corePoolSize時,多餘的空閒執行緒的存活時間。 即,超過corePoolSize的空閒執行緒,在多長時間內會被銷燬
unit keepAliveTime 的單位,如:TimeUnit.SECONDS
workQueue 任務佇列,被提交但尚未被執行的任務。
threadFactory 執行緒工廠,用於建立執行緒,一般用預設的即可。
handler 拒絕策略。當任務太多來不及處理,如何拒絕任務。

ThreadPoolExecutor的使用示例,通過execute()方法提交任務。

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 5, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        for (int i = 0; i < 10; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }
        executor.shutdown();
    }

或者通過submit()方法提交任務

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 5, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        List<Future> futureList = new Vector<>();
        //在其它執行緒中執行100次下列方法
        for (int i = 0; i < 100; i++) {
            futureList.add(executor.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return Thread.currentThread().getName();
                }
            }));
        }
        for (int i = 0;i<futureList.size();i++){
            Object o = futureList.get(i).get();
            System.out.println(o.toString());
        }
        executor.shutdown();
    }

執行結果:

...
pool-1-thread-4
pool-1-thread-3
pool-1-thread-2

下面主要講解ThreadPoolExecutor的構造方法中workQueue和RejectedExecutionHandler引數,其它引數都很簡單。

2、workQueue任務佇列

用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列。

  • ArrayBlockingQueue: 是一個基於陣列結構的有界阻塞佇列,按FIFO原則進行排序

  • LinkedBlockingQueue: 一個基於連結串列結構的阻塞佇列,吞吐量高於ArrayBlockingQueue。靜態工廠方法Excutors.newFixedThreadPool()使用了這個佇列

  • SynchronousQueue: 一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量高於LinkedBlockingQueue,靜態工廠方法Excutors.newCachedThreadPool()使用了這個佇列

  • PriorityBlockingQueue: 一個具有優先順序的無限阻塞佇列。

3、RejectedExecutionHandler飽和策略

當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略還處理新提交的任務。它可以有如下四個選項:

  • AbortPolicy : 直接丟擲異常,預設情況下采用這種策略

  • CallerRunsPolicy : 只用呼叫者所線上程來執行任務

  • DiscardOldestPolicy : 丟棄佇列裡最近的一個任務,並執行當前任務

  • DiscardPolicy : 不處理,丟棄掉

更多的時候,我們應該通過實現RejectedExecutionHandler 介面來自定義策略,比如記錄日誌或持久化儲存等。

4、submit()與execute()

可以使用execute和submit兩個方法向執行緒池提交任務。

  1. execute方法用於提交不需要返回值的任務,利用這種方式提交的任務無法得知是否正常執行

  2. submit方法用於提交一個任務並帶有返回值,這個方法將返回一個Future型別物件。可以通過這個返回物件判斷任務是否執行成功,並且可以通過future.get()方法來獲取返回值,get()方法會阻塞當前執行緒直到任務完成。

5、shutdown()與shutdownNow()

可以通過呼叫 shutdown() 或 shutdownNow() 方法來關閉執行緒池。它們的原理是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的 interrupt 方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法停止。

這倆方法的區別是,shutdownNow() 首先將執行緒池的狀態設定成STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表,而 shutdown() 只是將執行緒池的狀態設定成 SHUTDOWN 狀態,然後中斷所有沒有正在執行任務的執行緒。

只要呼叫了這兩個關閉方法的任意一個,isShutdown 方法就會返回 true。當所有的任務都已關閉了,才表示執行緒池關閉成功,這時呼叫 isTerminaced 方法會返回 true。

通常呼叫 shutdown() 方法來關閉執行緒池,如果任務不一定要執行完,則可以呼叫 shutdownNow() 方法。

6、合理配置執行緒池

要想合理地配置執行緒池,首先要分析任務特性

  • 任務的性質:CPU密集型任務、IO密集型任務和混合型任務。

  • 任務的優先順序:高、中和低。

  • 任務的執行時間:長、中和短。

  • 任務的依賴性:是否依賴其他系統資源,如資料庫連線。

性質不同的任務可以用不同規模的執行緒池分開處理。

CPU密集型任務應該配置儘可能少的執行緒,如配置N+1個執行緒,N位CPU的個數。

而IO密集型任務執行緒並不是一直在執行任務,則應配置儘可能多的執行緒,如2*N。

混合型任務,如果可以拆分,將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於序列執行的吞吐量。如果這兩個任務執行的時間相差很大,則沒有必要進行分解。可以通過Runtime.getRuntime().availableProcessors()方法獲得當前裝置的CPU個數。

優先順序不同的任務可以使用優先順序佇列PriorityBlockingQueue來處理。它可以讓優先順序高的任務先執行。

7、執行緒池的監控

由於大量的使用執行緒池,所以很有必要對其進行監控。可以通過繼承執行緒池來自定義執行緒池,重寫執行緒池的beforeExecute、afterExecute 和 terminated 方法,也可以在任務執行前,執行後和執行緒池關閉前執行一些程式碼來進行監控。在監控執行緒池的時候可以使用一下屬性:

(1) taskCount:執行緒池需要執行的任務數量

(2) completedTaskCount:執行緒池在執行過程中已完成的任務數量,小於或等於taskCount

(3) largestPoolSize: 執行緒池裡曾經建立過最大的執行緒數量。通過這個資料可以知道執行緒池是否曾經滿過。如該數值等於執行緒池最大大小,則表示執行緒池曾經滿過。

(4) getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,執行緒池裡的執行緒不會自動銷燬,所以這個大小隻增不減。

(5) getActiveCount:獲取活動的執行緒數