1. 程式人生 > >Java高併發程式設計:執行緒池

Java高併發程式設計:執行緒池

這裡首先介紹了java5中的併發的小工具包:java.util.concurrent.atomic,然後介紹了執行緒池的概念,對使用java5的方式建立不同形式的執行緒進行了演示,之後介紹了兩個 物件:Callable和Future,用於獲取執行緒執行後的結果,對於執行緒鎖技術則在另外一篇文章中介紹。

Java5中的執行緒併發庫都在java.util.concurrent包及子包中

1. Executor類的繼承結構

ThreadPoolExecutor

Executor是執行緒池的頂級介面,只有一個執行任務的方法execute()

ExecutorService是Executor的子介面,該介面中包含了執行緒池常用的一些方法

方法 功能描述
execute() 執行任務
shutdown() 呼叫後不再接收新任務,如果裡面有任務,就執行完
shutdownNow() 呼叫後不再接受新任務,如果有等待任務,移出佇列;有正在執行的,嘗試停止之
isShutdown() 判斷執行緒池是否關閉
isTerminated() 判斷執行緒池中任務是否執行完成
submit() 提交任務
invokeAll() 執行一組任務

2. ThreadPoolExecutor

ExecutorService的預設實現,同時也是Executors的底層實現

2.1 構造方法

public ThreadPoolExecutor(
    int corePoolSize, //核心執行緒數
    int maximumPoolSize, //最大執行緒數
    long keepAliveTime, //保持時間
    TimeUnit unit, //時間單位
    BlockingQueue<Runnable> workQueue, //阻塞佇列
    ThreadFactory threadFactory, //執行緒工廠
    RejectedExecutionHandler handler //異常捕獲器
)

2.1.1 int corePoolSize

核心池的大小,這個引數跟後面講述的執行緒池的實現原理有非常大的關係。在建立了執行緒池後,預設情況下,執行緒池中並沒有任何執行緒,而是等待有任務到來才建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預建立執行緒的意思,即在沒有任務到來之前就建立corePoolSize個執行緒或者一個執行緒。預設情況下,在建立了執行緒池後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中

2.1.2 int maximumPoolSize

執行緒池最大執行緒數,這個引數也是一個非常重要的引數,它表示線上程池中最多能建立多少個執行緒

2.1.3 long keepAliveTime

表示執行緒沒有任務執行時最多保持多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,直到執行緒池中的執行緒數不大於corePoolSize,即當執行緒池中的執行緒數大於corePoolSize時,如果一個執行緒空閒的時間達到keepAliveTime,則會終止,直到執行緒池中的執行緒數不超過corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0

2.1.4 TimeUnit unit

引數keepAliveTime的時間單位,有7種取值

  • TimeUnit.DAYS //天
  • TimeUnit.HOURS //小時
  • TimeUnit.MINUTES //分鐘
  • TimeUnit.SECONDS //秒
  • TimeUnit.MILLISECONDS //毫秒
  • TimeUnit.MICROSECONDS //微妙
  • TimeUnit.NANOSECONDS //納秒

2.1.5 RejectedExecutionHandler

  • ThreadPoolExecutor.AbortPolicy
    當新增任務出錯時的策略捕獲器,丟棄任務並丟擲RejectedExecutionException異常

  • ThreadPoolExecutor.DiscardPolicy
    也是丟棄任務,但是不丟擲異常

  • ThreadPoolExecutor.DiscardOldestPolicy
    丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)

  • ThreadPoolExecutor.CallerRunsPolicy
    由呼叫執行緒處理該任務

3. 任務提交給執行緒池之後的處理策略

1、如果當前執行緒池中的執行緒數目小於corePoolSize,則每來一個任務,就會建立執行這個任務

ThreadPoolExecutor

2、如果當前執行緒池中的執行緒數目>=corePoolSize,則每來一個任務,會嘗試將其新增到任務快取隊列當中

2.1、若新增成功,則該任務會等待空閒執行緒將其取出去執行

ThreadPoolExecutor

2.2、若新增失敗(一般來說是任務快取佇列已滿),則會嘗試建立新的執行緒去執行這個任務

ThreadPoolExecutor

3、如果當前執行緒池中的執行緒數目達到maximumPoolSize,則會採取任務拒絕策略進行處理

ThreadPoolExecutor

如果執行緒池中的執行緒數量大於 corePoolSize時,如果某執行緒空閒時間超過keepAliveTime,執行緒將被終止,直至執行緒池中的執行緒數目不大於corePoolSize;如果允許為核心池中的執行緒設定存活時間,那麼核心池中的執行緒空閒時間超過keepAliveTime,執行緒也會被終止

4. 阻塞佇列的介紹

4.1 BlockingQueue

阻塞佇列 功能描述
BlockingQueue 阻塞佇列的頂級介面,主要用於實現生產者消費者佇列
BlockingDeque 雙端佇列
SynchronousQueue 同步佇列,無界佇列,直接提交策略,交替佇列,在某次新增元素後必須等待其他執行緒取走後才能繼續新增
LinkedBlockingQueue 無界佇列,基於連結串列的阻塞佇列,可以併發執行,FIFO
ArrayBlockingQueue 基於陣列的有界(固定大小的陣列)阻塞佇列,只有put方法和take方法才具有阻塞功能,公平性 fairness
PriorityBlockingQueue 基於優先順序的阻塞佇列,依據物件的自然排序順序或者是建構函式所帶的Comparator決定的順序
DelayQueue 延時佇列

4.2 排隊策略

直接提交

工作佇列的預設選項是 SynchronousQueue,它將任務直接提交給執行緒而不保持它們。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。

無界佇列

使用無界佇列(例如,不具有預定義容量的 LinkedBlockingQueue)使用無界佇列將導致在所有 corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列。例如,在 Web 頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性

有界佇列

當使用有限的 maximumPoolSizes 時,有界佇列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降低吞吐量。

4.3 BlockingQueue

方法 Throw exception 丟擲異常 Special value 特殊值 Blocks 阻塞 Time out 超時
Insert add() offer() put() offer(e,time,unit)
Remove remove() poll() take() poll(time,unit)
Examine檢查 element() peek() 不可用 不可用

BlockingQueue 不接受 null 元素。試圖 addputoffer 一個 null 元素時,某些實現會丟擲 NullPointerExceptionnull 被用作指示 poll 操作失敗的警戒值。

BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地 put 附加元素。沒有任何內部容量約束的 BlockingQueue 總是報告 Integer.MAX_VALUE 的剩餘容量。

BlockingQueue 實現主要用於生產者-使用者佇列,但它另外還支援 Collection 介面。因此,舉例來說,使用 remove(x) 從佇列中移除任意一個元素是有可能的。然而,這種操作通常 會有效執行,只能有計劃地偶爾使用,比如在取消排隊資訊時。

BlockingQueue 實現是執行緒安全的。所有排隊方法都可以使用內部鎖或其他形式的併發控制來自動達到它們的目的。然而,大量的 Collection 操作(addAllcontainsAllretainAllremoveAll)沒有必要自動執行,除非在實現中特別說明。因此,舉例來說,在只添加了 c 中的一些元素後,addAll(c) 有可能失敗(丟擲一個異常)。

BlockingQueue 實質上不支援使用任何一種“close”或“shutdown”操作來指示不再新增任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種常用的策略是:對於生產者,插入特殊的end-of-stream或poison物件,並根據使用者獲取這些物件的時間來對它們進行解釋。

4.4 BlockingDeque

雙端佇列

4.5 ArrayBlockingQueue

一個由陣列支援的有界阻塞佇列。此佇列按 FIFO(先進先出)原則對元素進行排序。建立其物件必須明確大小,像陣列一樣。其內部實現是將物件放到一個數組裡。有界也就意味著,它不能夠儲存無限多數量的元素。它有一個同一時間能夠儲存元素數量的上限。你可以在對其初始化的時候設定這個上限,但之後就無法對這個上限進行修改了(譯者注:因為它是基於陣列實現的,也就具有陣列的特性:一旦初始化,大小就無法修改)。

實現互斥,你一下我一下

public class BlockingQueueCondition {
    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        final Business3 business = new Business3();
        service.execute(new Runnable(){
            public void run() {
                for(int i=0;i<50;i++){
                    business.sub();
                }
            }
        });
        for(int i=0;i<50;i++){
            business.main();
        }
    }
}
class Business3{
    BlockingQueue subQueue  = new ArrayBlockingQueue(1);
    BlockingQueue mainQueue = new ArrayBlockingQueue(1);
    {
        try {
            mainQueue.put(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public void sub(){
        try
        {
            mainQueue.take();
            for(int i=0;i<10;i++){
                System.out.println(Thread.currentThread().getName() + " : " + i);
            }
            subQueue.put(1);
        }catch(Exception e){
        }
    }
    public void main(){
        try
        {
            subQueue.take();
            for(int i=0;i<5;i++){
                System.out.println(Thread.currentThread().getName() + " : " + i);
            }
            mainQueue.put(1);
        }catch(Exception e){
        }
    }
}

輸出結果

pool-1-thread-1 : 0
pool-1-thread-1 : 1
pool-1-thread-1 : 2
pool-1-thread-1 : 3
pool-1-thread-1 : 4
pool-1-thread-1 : 5
pool-1-thread-1 : 6
pool-1-thread-1 : 7
pool-1-thread-1 : 8
pool-1-thread-1 : 9
main : 0
main : 1
main : 2
main : 3
main : 4
pool-1-thread-1 : 0
pool-1-thread-1 : 1
pool-1-thread-1 : 2
pool-1-thread-1 : 3
pool-1-thread-1 : 4
pool-1-thread-1 : 5
pool-1-thread-1 : 6
pool-1-thread-1 : 7
pool-1-thread-1 : 8
pool-1-thread-1 : 9
main : 0
main : 1
main : 2
main : 3
main : 4
pool-1-thread-1 : 0
pool-1-thread-1 : 1
pool-1-thread-1 : 2
pool-1-thread-1 : 3
pool-1-thread-1 : 4
pool-1-thread-1 : 5
pool-1-thread-1 : 6
pool-1-thread-1 : 7
pool-1-thread-1 : 8
pool-1-thread-1 : 9
...

4.6 LinkedBlockingQueue

一個可改變大小的阻塞佇列。此佇列按 FIFO(先進先出)原則對元素進行排序。建立其物件如果沒有明確大小,預設值是Integer.MAX_VALUE。連結佇列的吞吐量通常要高於基於陣列的佇列,但是在大多數併發應用程式中,其可預知的效能要低。

4.7 SynchronousQueue

同步佇列。同步佇列沒有任何容量,每個插入必須等待另一個執行緒移除,反之亦然。是一個特殊的佇列,它的內部同時只能夠容納單個元素。如果該佇列已有一元素的話,試圖向佇列中插入一個新元素的執行緒將會阻塞,直到另一個執行緒將該元素從佇列中抽走。同樣,如果該佇列為空,試圖向佇列中抽取一個元素的執行緒將會阻塞,直到另一個執行緒向佇列中插入了一條新的元素。據此,把這個類稱作一個佇列顯然是誇大其詞了。它更多像是一個匯合點。

4.8 DelayQueue

延時佇列,對元素進行持有直到一個特定的延遲到期,只有在延遲期滿時才能從中提取元素。注入其中的元素必須實現 java.util.concurrent.Delayed 介面。

4.9 PriorityBlockingQueue

基於優先順序的阻塞佇列,依據物件的自然排序順序或者是建構函式所帶的Comparator決定的順序,應用:Volley

4.10 生產者消費者模式

生產者生產任務,消費者消費任務,那麼這時就需要一個任務佇列,生產者向佇列裡插入任務,消費者從佇列裡提取任務執行

5. 執行緒池工具類Executors

jdk1.5之後的一個新類,提供了一些靜態工廠,生成一些常用的執行緒池,ThreadPoolExecutor是Executors類的底層實現

方法 功能描述
newCachedThreadPool() 建立一個可快取的執行緒池
newFixedThreadPool() 建立一個固定大小的執行緒池
newScheduledThreadPool() 建立一個大小無限的執行緒池。此執行緒池支援定時以及週期性執行任務的需求
newSingleThreadExecutor() 建立單個執行緒的執行緒池,始終保證執行緒池中會有一個執行緒在。當某執行緒死去,會找繼任者
defaultThreadFactory() 建立一個預設執行緒池工廠

6. 執行緒池

線上程池的程式設計模式下,任務是提交給整個執行緒池,而不是直接交給某個執行緒,執行緒池在拿到任務後,它就在內部找有無空閒的執行緒,再把任務交給內部某個空閒的執行緒,這就是封裝

記住:任務是提交給整個執行緒池,一個執行緒同時只能執行一個任務,但可以同時向一個執行緒池提交多個任務。

示例:

  • 建立固定大小的執行緒池
  • 建立快取執行緒池
  • 用執行緒池建立定時器
  • 建立單一執行緒池(始終保證執行緒池中會有一個執行緒在。當某執行緒死去,會找繼任者)

注意

定時器中總是相對時間,我們要想指定具體時間的方法:比如明天早上10點鐘執行,則可以使用明天早上10點的時間減去當前的時間,得到時間間隔

import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.TimeUnit;  

public class ThreadPoolTest {  
    public static void main(String[] args){  

        //建立固定大小的執行緒池,這裡只能完成3個任務  
        //ExecutorService threadPool = Executors.newFixedThreadPool(3);  

        //建立快取執行緒池,根據任務來自動建立執行緒的數量,可以完成建立的所有任務  
        //ExecutorService threadPool = Executors.newCachedThreadPool();  

        //建立單一執行緒池(始終保持執行緒池中有一個執行緒存活。當唯一執行緒死去,會建立新的繼任者、  
        ExecutorService threadPool = Executors.newSingleThreadExecutor();  

        for(int i=1;i<=10;i++){  
  //內部類不能訪問外部類的區域性變數,所以i要定義為final,又由於i++.  
  //所以在迴圈內部定義一個變數接收i  
            final int task = i;  
        threadPool.execute(new Runnable() {  

            @Override  
            public void run() {  
                for(int j=1;j<=10;j++){  
                    System.out.println(Thread.currentThread().getName()  
                            +" is looping of "+ j+"  for task of " +task);  
                }  

            }  
        });  
        }  
        //驗證10個任務都提交給了執行緒池  
        System.out.println("all of 10 tasks have committed! ");  
        //threadPool.shutdown();        //等任務完成後,殺死執行緒、  
        //threadPool.shutdownNow();     //立即停止執行緒  

        //用執行緒池啟動定時器  

        Executors.newScheduledThreadPool(3).schedule(  
                new Runnable() {  //任務  
                @Override  
                public void run() {  
                    System.out.println("bombing!");  
                }  
            },   
                    5,  //5秒以後執行  
                    TimeUnit.SECONDS);  //單位  

    //在某個時間執行一次後,再指定後續的執行間隔時間  
        Executors.newScheduledThreadPool(3).scheduleAtFixedRate(new Runnable(){  

            @Override  
            public void run() {           
                System.out.println("bombing!");  
            }  

        }, 10,   //第一次在10秒時爆炸  
            3,          //以後每隔3秒爆炸一次。  
        TimeUnit.SECONDS);   

    }  
}  

7. 執行緒池的簡單使用

/**
 * 一個簡易的執行緒池管理類,提供三個執行緒池
 */
public class ThreadManager {
    public static final String DEFAULT_SINGLE_POOL_NAME = "DEFAULT_SINGLE_POOL_NAME";

    private static ThreadPoolProxy mLongPool = null;
    private static Object mLongLock = new Object();

    private static ThreadPoolProxy mShortPool = null;
    private static Object mShortLock = new Object();

    private static ThreadPoolProxy mDownloadPool = null;
    private static Object mDownloadLock = new Object();

    private static Map<String, ThreadPoolProxy> mMap = new HashMap<String, ThreadPoolProxy>();
    private static Object mSingleLock = new Object();

    /** 獲取下載執行緒 */
    public static ThreadPoolProxy getDownloadPool() {
        synchronized (mDownloadLock) {
            if (mDownloadPool == null) {
                mDownloadPool = new ThreadPoolProxy(3, 3, 5L);
            }
            return mDownloadPool;
        }
    }

    /** 獲取一個用於執行長耗時任務的執行緒池,避免和短耗時任務處在同一個佇列而阻塞了重要的短耗時任務,通常用來聯網操作 */
    public static ThreadPoolProxy getLongPool() {
        synchronized (mLongLock) {
            if (mLongPool == null) {
                mLongPool = new ThreadPoolProxy(5, 5, 5L);
            }
            return mLongPool;
        }
    }

    /** 獲取一個用於執行短耗時任務的執行緒池,避免因為和耗時長的任務處在同一個佇列而長時間得不到執行,通常用來執行本地的IO/SQL */
    public static ThreadPoolProxy getShortPool() {
        synchronized (mShortLock) {
            if (mShortPool == null) {
                mShortPool = new ThreadPoolProxy(2, 2, 5L);
            }
            return mShortPool;
        }
    }

    /** 獲取一個單執行緒池,所有任務將會被按照加入的順序執行,免除了同步開銷的問題 */
    public static ThreadPoolProxy getSinglePool() {
        return getSinglePool(DEFAULT_SINGLE_POOL_NAME);
    }

    /** 獲取一個單執行緒池,所有任務將會被按照加入的順序執行,免除了同步開銷的問題 */
    public static ThreadPoolProxy getSinglePool(String name) {
        synchronized (mSingleLock) {
            ThreadPoolProxy singlePool = mMap.get(name);
            if (singlePool == null) {
                singlePool = new ThreadPoolProxy(1, 1, 5L);
                mMap.put(name, singlePool);
            }
            return singlePool;
        }
    }

    public static class ThreadPoolProxy {
        private ThreadPoolExecutor mPool;
        private int mCorePoolSize;
        private int mMaximumPoolSize;
        private long mKeepAliveTime;

        private ThreadPoolProxy(int corePoolSize, int maximumPoolSize, long keepAliveTime) {
            mCorePoolSize = corePoolSize;
            mMaximumPoolSize = maximumPoolSize;
            mKeepAliveTime = keepAliveTime;
        }

        /** 執行任務,當執行緒池處於關閉,將會重新建立新的執行緒池 */
        public synchronized void execute(Runnable run) {
            if (run == null) {
                return;
            }
            if (mPool == null || mPool.isShutdown()) {
                //引數說明
                //當執行緒池中的執行緒小於mCorePoolSize,直接建立新的執行緒加入執行緒池執行任務
                //當執行緒池中的執行緒數目等於mCorePoolSize,將會把任務放入任務佇列BlockingQueue中
                //當BlockingQueue中的任務放滿了,將會建立新的執行緒去執行,
                //但是當匯流排程數大於mMaximumPoolSize時,將會丟擲異常,交給RejectedExecutionHandler處理
                //mKeepAliveTime是執行緒執行完任務後,且佇列中沒有可以執行的任務,存活的時間,後面的引數是時間單位
                //ThreadFactory是每次建立新的執行緒工廠
                mPool = new ThreadPoolExecutor(mCorePoolSize, mMaximumPoolSize, mKeepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new AbortPolicy());
            }
            mPool.execute(run);
        }

        /** 取消執行緒池中某個還未執行的任務 */
        public synchronized void cancel(Runnable run) {
            if (mPool != null && (!mPool.isShutdown() || mPool.isTerminating())) {
                mPool.getQueue().remove(run);
            }
        }

        /** 取消執行緒池中某個還未執行的任務 */
        public synchronized boolean contains(Runnable run) {
            if (mPool != null && (!mPool.isShutdown() || mPool.isTerminating())) {
                return mPool.getQueue().contains(run);
            } else {
                return false;
            }
        }

        /** 立刻關閉執行緒池,並且正在執行的任務也將會被中斷 */
        public void stop() {
            if (mPool != null && (!mPool.isShutdown() || mPool.isTerminating())) {
                mPool.shutdownNow();
            }
        }

        /** 平緩關閉單任務執行緒池,但是會確保所有已經加入的任務都將會被執行完畢才關閉 */
        public synchronized void shutdown() {
            if (mPool != null && (!mPool.isShutdown() || mPool.isTerminating())) {
                mPool.shutdownNow();
            }
        }
    }
}