1. 程式人生 > >執行緒池深度好文(轉載)

執行緒池深度好文(轉載)

轉自:https://www.cnblogs.com/baizhanshi/p/5469948.html

1、new Thread的弊端

執行一個非同步任務你還只是如下new Thread嗎?

Java   ?
1 2 3 4 5 6 7 new Thread( new Runnable() {    @Override public void run() {
// TODO Auto-generated method stub } }).start();

那你就out太多了,new Thread的弊端如下:

a. 每次new Thread新建物件效能差。
b. 執行緒缺乏統一管理,可能無限制新建執行緒,相互之間競爭,及可能佔用過多系統資源導致宕機或oom。
c. 缺乏更多功能,如定時執行、定期執行、執行緒中斷。
相比new Thread,Java提供的四種執行緒池的好處在於:
a. 重用存在的執行緒,減少物件建立、消亡的開銷,效能佳。
b. 可有效控制最大併發執行緒數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
c. 提供定時執行、定期執行、單執行緒、併發數控制等功能。

 

2、Java 執行緒池
Java通過Executors提供四種執行緒池,分別為:
newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。
newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

(1). newCachedThreadPool
建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。示例程式碼如下:

Java   ?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for ( int i = 0 ; i < 10 ; i++) { final int index = i; try { Thread.sleep(index * 1000 ); } catch (InterruptedException e) { e.printStackTrace(); }    cachedThreadPool.execute( new Runnable() {    @Override public void run() { System.out.println(index); } }); }

執行緒池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的執行緒,而不用每次新建執行緒。

(2). newFixedThreadPool
建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。示例程式碼如下:

Java   ?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ExecutorService fixedThreadPool = Executors.newFixedThreadPool( 3 ); for ( int i = 0 ; i < 10 ; i++) { final int index = i; fixedThreadPool.execute( new Runnable() {    @Override public void run() { try { System.out.println(index); Thread.sleep( 2000 ); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); }

  

因為執行緒池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒列印3個數字。

定長執行緒池的大小最好根據系統資源進行設定。如Runtime.getRuntime().availableProcessors()。可參考PreloadDataCache

 

(3) newScheduledThreadPool
建立一個定長執行緒池,支援定時及週期性任務執行。延遲執行示例程式碼如下:

Java ?
1 2 3 4 5 6 7 8 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool( 5 ); scheduledThreadPool.schedule( new Runnable() {    @Override public void run() { System.out.println( "delay 3 seconds" ); } }, 3 , TimeUnit.SECONDS);

表示延遲3秒執行。

定期執行示例程式碼如下:

Java ?
1 2 3 4 5 6 7 scheduledThreadPool.scheduleAtFixedRate( new Runnable() {    @Override public void run() { System.out.println( "delay 1 seconds, and excute every 3 seconds" ); } }, 1 , 3 , TimeUnit.SECONDS);

 

表示延遲1秒後每3秒執行一次。

ScheduledExecutorService比Timer更安全,功能更強大,後面會有一篇單獨進行對比。

 

(4)、newSingleThreadExecutor
建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。示例程式碼如下:

Java   ?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); for ( int i = 0 ; i < 10 ; i++) { final int index = i; singleThreadExecutor.execute( new Runnable() {    @Override public void run() { try { System.out.println(index); Thread.sleep( 2000 ); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); }

結果依次輸出,相當於順序執行各個任務。

現行大多數GUI程式都是單執行緒的。Android中單執行緒可用於資料庫操作,檔案操作,應用批量安裝,應用批量刪除等不適合併發但可能IO阻塞性及影響UI執行緒響應的操作。

 

執行緒池的作用:

執行緒池作用就是限制系統中執行執行緒的數量。
     根 據系統的環境情況,可以自動或手動設定執行緒數量,達到執行的最佳效果;少了浪費了系統資源,多了造成系統擁擠效率不高。用執行緒池控制執行緒數量,其他執行緒排 隊等候。一個任務執行完畢,再從佇列的中取最前面的任務開始執行。若佇列中沒有等待程序,執行緒池的這一資源處於等待。當一個新任務需要執行時,如果執行緒池 中有等待的工作執行緒,就可以開始運行了;否則進入等待佇列。

為什麼要用執行緒池:

1.減少了建立和銷燬執行緒的次數,每個工作執行緒都可以被重複利用,可執行多個任務。

2.可以根據系統的承受能力,調整執行緒池中工作線執行緒的數目,防止因為消耗過多的記憶體,而把伺服器累趴下(每個執行緒需要大約1MB記憶體,執行緒開的越多,消耗的記憶體也就越大,最後宕機)。

Java裡面執行緒池的頂級介面是Executor,但是嚴格意義上講Executor並不是一個執行緒池,而只是一個執行執行緒的工具。真正的執行緒池介面是ExecutorService。

比較重要的幾個類:

ExecutorService

真正的執行緒池介面。

ScheduledExecutorService

能和Timer/TimerTask類似,解決那些需要任務重複執行的問題。

ThreadPoolExecutor

ExecutorService的預設實現。

ScheduledThreadPoolExecutor

繼承ThreadPoolExecutor的ScheduledExecutorService介面實現,週期性任務排程的類實現。

要配置一個執行緒池是比較複雜的,尤其是對於執行緒池的原理不是很清楚的情況下,很有可能配置的執行緒池不是較優的,因此在Executors類裡面提供了一些靜態工廠,生成一些常用的執行緒池。

1. newSingleThreadExecutor

建立一個單執行緒的執行緒池。這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒序列執行所有任務。如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。

2.newFixedThreadPool

建立固定大小的執行緒池。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。

3. newCachedThreadPool

建立一個可快取的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,

那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。

4.newScheduledThreadPool

建立一個大小無限的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。

例項

1:newSingleThreadExecutor

MyThread.java

publicclassMyThread extends Thread {

    @Override

    publicvoid run() {

        System.out.println(Thread.currentThread().getName() + "正在執行。。。");

    }

}

TestSingleThreadExecutor.java

publicclassTestSingleThreadExecutor {

    publicstaticvoid main(String[] args) {

        //建立一個可重用固定執行緒數的執行緒池

        ExecutorService pool = Executors. newSingleThreadExecutor();

        //建立實現了Runnable介面物件,Thread物件當然也實現了Runnable介面

        Thread t1 = new MyThread();

        Thread t2 = new MyThread();

        Thread t3 = new MyThread();

        Thread t4 = new MyThread();

        Thread t5 = new MyThread();

        //將執行緒放入池中進行執行

        pool.execute(t1);

        pool.execute(t2);

        pool.execute(t3);

        pool.execute(t4);

        pool.execute(t5);

        //關閉執行緒池

        pool.shutdown();

    }

}

輸出結果

pool-1-thread-1正在執行。。。

pool-1-thread-1正在執行。。。

pool-1-thread-1正在執行。。。

pool-1-thread-1正在執行。。。

pool-1-thread-1正在執行。。。

2newFixedThreadPool

TestFixedThreadPool.Java

publicclass TestFixedThreadPool {

    publicstaticvoid main(String[] args) {

        //建立一個可重用固定執行緒數的執行緒池

        ExecutorService pool = Executors.newFixedThreadPool(2);

        //建立實現了Runnable介面物件,Thread物件當然也實現了Runnable介面

        Thread t1 = new MyThread();

        Thread t2 = new MyThread();

        Thread t3 = new MyThread();

        Thread t4 = new MyThread();

        Thread t5 = new MyThread();

        //將執行緒放入池中進行執行

        pool.execute(t1);

        pool.execute(t2);

        pool.execute(t3);

        pool.execute(t4);

        pool.execute(t5);

        //關閉執行緒池

        pool.shutdown();

    }

}

輸出結果

pool-1-thread-1正在執行。。。

pool-1-thread-2正在執行。。。

pool-1-thread-1正在執行。。。

pool-1-thread-2正在執行。。。

pool-1-thread-1正在執行。。。

3 newCachedThreadPool

TestCachedThreadPool.java

publicclass TestCachedThreadPool {

    publicstaticvoid main(String[] args) {

        //建立一個可重用固定執行緒數的執行緒池

        ExecutorService pool = Executors.newCachedThreadPool();

        //建立實現了Runnable介面物件,Thread物件當然也實現了Runnable介面

        Thread t1 = new MyThread();

        Thread t2 = new MyThread();

        Thread t3 = new MyThread();

        Thread t4 = new MyThread();

        Thread t5 = new MyThread();

        //將執行緒放入池中進行執行

        pool.execute(t1);

        pool.execute(t2);

        pool.execute(t3);

        pool.execute(t4);

        pool.execute(t5);

        //關閉執行緒池

        pool.shutdown();

    }

}

輸出結果:

pool-1-thread-2正在執行。。。

pool-1-thread-4正在執行。。。

pool-1-thread-3正在執行。。。

pool-1-thread-1正在執行。。。

pool-1-thread-5正在執行。。。

4newScheduledThreadPool

TestScheduledThreadPoolExecutor.java

publicclass TestScheduledThreadPoolExecutor {

    publicstaticvoid main(String[] args) {

        ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);

        exec.scheduleAtFixedRate(new Runnable() {//每隔一段時間就觸發異常

                      @Override

                      publicvoid run() {

                           //throw new RuntimeException();

                           System.out.println("================");

                      }

                  }, 1000, 5000, TimeUnit.MILLISECONDS);

        exec.scheduleAtFixedRate(new Runnable() {//每隔一段時間列印系統時間,證明兩者是互不影響的

                      @Override

                      publicvoid run() {

                           System.out.println(System.nanoTime());

                      }

                  }, 1000, 2000, TimeUnit.MILLISECONDS);

    }

}

輸出結果

 

================

8384644549516

8386643829034

8388643830710

================

8390643851383

8392643879319

8400643939383

無論建立那種執行緒池 必須要呼叫ThreadPoolExecutor

執行緒池類為 java.util.concurrent.ThreadPoolExecutor,常用構造方法為: 

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, 
BlockingQueue workQueue, 
RejectedExecutionHandler handler) 
corePoolSize: 執行緒池維護執行緒的最少數量 
maximumPoolSize:執行緒池維護執行緒的最大數量 
keepAliveTime: 執行緒池維護執行緒所允許的空閒時間 
unit: 執行緒池維護執行緒所允許的空閒時間的單位 
workQueue: 執行緒池所使用的緩衝佇列 
handler: 執行緒池對拒絕任務的處理策略 

一個任務通過 execute(Runnable)方法被新增到執行緒池,任務就是一個 Runnable型別的物件,任務的執行方法就是 Runnable型別物件的run()方法。 

當一個任務通過execute(Runnable)方法欲新增到執行緒池時: 

如果此時執行緒池中的數量小於corePoolSize,即使執行緒池中的執行緒都處於空閒狀態,也要建立新的執行緒來處理被新增的任務。 
如果此時執行緒池中的數量等於 corePoolSize,但是緩衝佇列 workQueue未滿,那麼任務被放入緩衝佇列。 
如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量小於maximumPoolSize,建新的執行緒來處理被新增的任務。 
如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量等於maximumPoolSize,那麼通過 handler所指定的策略來處理此任務。 

也就是:處理任務的優先順序為: 
核心執行緒corePoolSize、任務佇列workQueue、最大執行緒maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。 

當執行緒池中的執行緒數量大於 corePoolSize時,如果某執行緒空閒時間超過keepAliveTime,執行緒將被終止。這樣,執行緒池可以動態的調整池中的執行緒數。 

unit可選的引數為java.util.concurrent.TimeUnit中的幾個靜態屬性: 
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。 

workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue 

handler有四個選擇: 
ThreadPoolExecutor.AbortPolicy() 
丟擲java.util.concurrent.RejectedExecutionException異常 
ThreadPoolExecutor.CallerRunsPolicy() 
重試添加當前的任務,他會自動重複呼叫execute()方法 
ThreadPoolExecutor.DiscardOldestPolicy() 
拋棄舊的任務 
ThreadPoolExecutor.DiscardPolicy() 
拋棄當前的任務 

 

當然也可以根據應用場景實現RejectedExecutionHandler介面,自定義飽和策略,如記錄日誌或持久化儲存不能處理的任務。

Executor 可 以 創 建 3 種 類 型 的 ThreadPoolExecutor 線 程 池:

 1. FixedThreadPool

建立固定長度的執行緒池,每次提交任務建立一個執行緒,直到達到執行緒池的最大數量,執行緒池的大小不再變化。

這個執行緒池可以建立固定執行緒數的執行緒池。特點就是可以重用固定數量執行緒的執行緒池。它的構造原始碼如下:

1 2 3 4 5
public  static  ExecutorService newFixedThreadPool( int  nThreads) {           return  new  ThreadPoolExecutor(nThreads, nThreads, 0L,                                        TimeUnit.MILLISECONDS,                                         new  LinkedBlockingQueue<Runnable>()); 
  • FixedThreadPool的corePoolSize和maxiumPoolSize都被設定為建立FixedThreadPool時指定的引數nThreads。
  • 0L則表示當執行緒池中的執行緒數量操作核心執行緒的數量時,多餘的執行緒將被立即停止
  • 最後一個引數表示FixedThreadPool使用了無界佇列LinkedBlockingQueue作為執行緒池的做工佇列,由於是無界的,當執行緒池的執行緒數達到corePoolSize後,新任務將在無界佇列中等待,因此執行緒池的執行緒數量不會超過corePoolSize,同時maxiumPoolSize也就變成了一個無效的引數,並且執行中的執行緒池並不會拒絕任務。

FixedThreadPool執行圖如下

執行過程如下:

1.如果當前工作中的執行緒數量少於corePool的數量,就建立新的執行緒來執行任務。

2.當執行緒池的工作中的執行緒數量達到了corePool,則將任務加入LinkedBlockingQueue。

3.執行緒執行完1中的任務後會從佇列中去任務。

注意LinkedBlockingQueue是無界佇列,所以可以一直新增新任務到執行緒池。

 

2. SingleThreadExecutor  

SingleThreadExecutor是使用單個worker執行緒的Executor。特點是使用單個工作執行緒執行任務。它的構造原始碼如下:

1 2 3 4 5 6
public  static  ExecutorService newSingleThreadExecutor() {          return  new  FinalizableDelegatedExecutorService              ( new  ThreadPoolExecutor( 1 1 ,                                      0L, TimeUnit.MILLISECONDS,                                      new  LinkedBlockingQueue<Runnable>())); }

  

SingleThreadExecutor的corePoolSize和maxiumPoolSize都被設定1。 其他引數均與FixedThreadPool相同,其執行圖如下:

 

執行過程如下:

1.如果當前工作中的執行緒數量少於corePool的數量,就建立一個新的執行緒來執行任務。

2.當執行緒池的工作中的執行緒數量達到了corePool,則將任務加入LinkedBlockingQueue。

3.執行緒執行完1中的任務後會從佇列中去任務。

注意:由於線上程池中只有一個工作執行緒,所以任務可以按照新增順序執行。

 

 3. CachedThreadPool

 CachedThreadPool是一個”無限“容量的執行緒池,它會根據需要建立新執行緒。特點是可以根據需要來建立新的執行緒執行任務,沒有特定的corePool。下面是它的構造方法:

1 2 3 4 5
public  static  ExecutorService newCachedThreadPool() {          return  new  ThreadPoolExecutor( 0 , Integer.MAX_VALUE,                                        60L, TimeUnit.SECONDS,                                        new  SynchronousQueue<Runnable>()); }

  

CachedThreadPool的corePoolSize被設定為0,即corePool為空;maximumPoolSize被設定為Integer.MAX_VALUE,即maximum是無界的。這裡keepAliveTime設定為60秒,意味著空閒的執行緒最多可以等待任務60秒,否則將被回收。   CachedThreadPool使用 沒有容量的SynchronousQueue作為主執行緒池的工作佇列,它是一個沒有容量的阻塞佇列。每個插入操作必須等待另一個執行緒的對應移除操作。這意味著,如果主執行緒提交任務的速度高於執行緒池中處理任務的速度時,CachedThreadPool會不斷建立新執行緒。極端情況下,CachedThreadPool會因為建立過多執行緒而耗盡CPU資源。其執行圖如下:

 

執行過程如下:

1.首先執行SynchronousQueue.offer(Runnable task)。如果在當前的執行緒池中有空閒的執行緒正在執行SynchronousQueue.poll(),那麼主執行緒執行的offer操作與空閒執行緒執行的poll操作配對成功,主執行緒把任務交給空閒執行緒執行。,execute()方法執行成功,否則執行步驟2

2.當執行緒池為空(初始maximumPool為空)或沒有空閒執行緒時,配對失敗,將沒有執行緒執行SynchronousQueue.poll操作。這種情況下,執行緒池會建立一個新的執行緒執行任務。

3.在建立完新的執行緒以後,將會執行poll操作。當步驟2的執行緒執行完成後,將等待60秒,如果此時主執行緒提交了一個新任務,那麼這個空閒執行緒將執行新任務,否則被回收。因此長時間不提交任務的CachedThreadPool不會佔用系統資源。

SynchronousQueue是一個不儲存元素阻塞佇列,每次要進行offer操作時必須等待poll操作,否則不能繼續新增元素。

 

最後 來個各種阻塞佇列的說明和比較:

  Java併發包中的阻塞佇列一共7個,當然他們都是執行緒安全的。 

  ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列。 

  LinkedBlockingQueue:一個由連結串列結構組成的有界阻塞佇列。 

  PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列。 

  DealyQueue:一個使用優先順序佇列實現的無界阻塞佇列。 

  SynchronousQueue:一個不儲存元素的阻塞佇列。 

  LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。 

  LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。