1. 程式人生 > >併發:執行緒池的使用。

併發:執行緒池的使用。

執行緒池的建立

我們可以通過ThreadPoolExecutor來建立一個執行緒池。

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds, runnableTaskQueue, handler);

建立一個執行緒池時需要輸入幾個引數。如下。

corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有基本執行緒。

runnableTaskQueue(任務佇列):用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列。

  • ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按FIFO(先進先出)原則對元素進行排序。
  • LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO排序元素,吞吐量通常高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。
  • SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。
  • PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。

maximumPoolSize(執行緒池最大數量):執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是,如果使用了無界的任務佇列這個引數就沒什麼效果。

ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder可以快速給執行緒池裡的執行緒設定有意義的名字,程式碼如下。

new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();

RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。在JDK 1.5中Java執行緒池框架提供了以下4種策略。

  • AbortPolicy:直接丟擲異常。
  • CallerRunsPolicy:只用呼叫者所線上程來執行任務。
  • DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
  • DiscardPolicy:不處理,丟棄掉。

當然也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化儲存不能處理的任務。

keepAliveTime(執行緒活動保持時間):執行緒池的工作執行緒空閒後,保持存活的時間。所以,如果任務很多,並且每個任務執行的時間短,可以調大時間,提高執行緒的利用率。

TimeUnit(執行緒活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微妙)。

向執行緒池提交任務

可以使用兩個方法向執行緒池提交任務,分別為execute()和submit()方法。

execute()方法用於提交不需要返回值的任務,所以無法判斷任務是否被執行緒池執行成功。通過以下程式碼可知execute()方法輸入的任務是一個Runnable類的例項。

threadsPool.execute(new Runnable() {
    @Override
    public void run() {
        // doSomething
    ]
});

submit()方法用於提交需要返回值的任務。執行緒池會返回一個future型別的物件,通過這個future物件可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前執行緒直到任務完成,而使用get(long timeout, TimeUnit unit)則會阻塞當前執行緒一段時間後立即返回,這時候有可能任務沒有執行完。

Future<Object> future = executor.submit(harReturnValuetask);
try {
    Object s = future.get();
} catch(InterruptedException e) {
    // 處理中斷異常
} catch (ExecutionException e) {
    // 處理無法執行任務異常
} finally {
    // 關閉執行緒池
    executor.shutdown();
}

關閉執行緒池

可以通過呼叫執行緒池的shutdown或shutdownNow方法來關閉執行緒池。他們的原理是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的interrupt方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。但是他們存在一定的區別,shutdownNow首先將執行緒池的狀態設定成STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表,而shutdown只是將執行緒池的狀態設定成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的執行緒。

只要呼叫了這兩個關閉方法中的任意一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示執行緒池關閉成功,這時呼叫isTerminaed方法會返回true。至於應該呼叫哪一種方法來關閉執行緒池,應該由提交到執行緒池的任務特性決定,通常呼叫shutdown方法來關閉執行緒池,如果任務不一定要執行完,則可以呼叫shutdownNow方法。

合理的配置執行緒池

要想合理的配置執行緒池,就必須首先分析任務特性,可以從以下幾個角度來分析。

  • 任務的性質:CPU密集型任務、IO密集型任務和混合型任務。
  • 任務的優先順序:高、中和低。
  • 任務的執行時長:長、中和短。
  • 任務的依賴性:是否依賴其他系統資源,如資料庫連線。

性質不同的任務可以用不同規模的執行緒池分開處理。CPU密集型任務應配置儘可能小的執行緒,如配置Ncpu+1個執行緒的執行緒池。由於IO密集型任務執行緒並不是一直在執行任務,則應配置儘可能多的執行緒,如2*Ncpu。混合型的任務,如果可以拆分,將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務指定的時間相差不是太大,那麼分解後執行的吞吐量將高於序列執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過Runtime.getRuntime().availableProcessors()方法獲得當前裝置的CPU個數。

優先順序不同的任務可以使用優先順序佇列PriorityBlockingQueue來處理。他可以讓優先順序高的任務先執行。

注意:如果一直有優先順序高的任務提交到佇列裡,那麼優先順序低的任務可能永遠不能執行。

執行時間不同的任務可以交給不同規模的的執行緒池來處理,或者可以使用優先順序佇列,讓執行時間短的任務先執行。
依賴資料庫連線池的任務,因為執行緒提交SQL後需要等待資料庫返回結果,等待的時間越長,則CPU空閒時間就越長,那麼執行緒數應該設定的越大,這樣才能更好的利用CPU。

建議使用有界佇列。有界佇列能增加系統的穩定性和預警能力,可以根據需要設大一點兒,比如幾千。有一次,我們系統裡後臺任務執行緒池的佇列和執行緒池全滿了,不斷丟擲拋棄任務的異常,通過排查發現是資料庫出現了問題,導致執行SQL變得非常緩慢,因為後臺任務執行緒池裡的任務全是需要向資料庫查詢和插入資料的,所以導致執行緒池裡的工作執行緒全部阻塞,任務積壓線上程池裡。如果當時我們設定成無界佇列,那麼執行緒池的佇列就會越來越多,有可能會撐滿記憶體,導致整個系統不可用,而不只是後臺任務出現問題。當然,我們的系統所有的任務是用單獨的伺服器部署的,我們使用不同規模的執行緒池完成不同型別的任務,但是出現這樣我呢提時與也會影響到其他任務。

執行緒池的監控

如果在系統中大量使用執行緒池,則有必要對執行緒池進行監控,方便在出現我呢提時,可以根據執行緒池的使用狀況快速定位問題。可以通過執行緒池提供的引數進行監控,在監控執行緒池的時候可以使用以下屬性。

  • taskCount:執行緒池需要執行的任務數量。
  • completedTaskCount:執行緒池在執行過程中已完成的任務數量,小於或等於taskCount。
  • largestPoolSize:執行緒池裡曾經建立過最大執行緒數量。通過這個資料可以知道執行緒池是否曾經滿過。如該數值等於執行緒池的最大大小,則表示執行緒池曾經滿過。
  • getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,執行緒池裡的執行緒不會自動銷燬,所以這個大小隻增不減。
  • getActiveCount:獲取活動的執行緒數。

通過擴充套件執行緒池進行監控,可以通過繼承執行緒池來自定義執行緒池,重寫執行緒池的beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行後和執行緒池關閉前執行一些程式碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。這幾個方法線上程池裡是空方法。

protected void beforeExecute(Thread t, Runnable r){}