1. 程式人生 > >跟我學Java多執行緒——執行緒池與阻塞佇列

跟我學Java多執行緒——執行緒池與阻塞佇列

前言

    上一篇文章中我們將ThreadPoolExecutor進行了深入的學習和介紹,實際上我們在專案中應用的時候很少有直接應用ThreadPoolExecutor來建立執行緒池的,在jdkapi中有這麼一句話“但是,強烈建議程式設計師使用較為方便的 Executors 工廠方法Executors.newCachedThreadPool()(無界執行緒池,可以進行自動執行緒回收)、Executors.newFixedThreadPool(int)(固定大小執行緒池)和Executors.newSingleThreadExecutor()(單個後臺執行緒),它們均為大多數使用場景預定義了設定。”所以這篇文章我們繼續學習其它幾種執行緒池。

執行緒池分類

newCachedThreadPool()

建立一個可快取的執行緒池,即這個執行緒池是無界執行緒池,無界指工作執行緒的建立數量幾乎沒有限制(其實也有限制的,數目為Interger.MAX_VALUE),這樣可以靈活的往執行緒池中新增資料;可以進行自動執行緒回收指的是如果長時間沒有往執行緒池中提交任務,即如果工作執行緒空閒了指定的時間,則該工作執行緒將自動終止。終止後,如果你又提交了新的任務,則執行緒池重新建立一個工作執行緒。

我們一般使用如下程式碼進行建立:

ExecutorServiceservice = Executors.newCachedThreadPool();

我們點選程式碼進入原始碼:

   /**
     * Creates a thread pool that creates newthreads as needed, but
     * will reuse previously constructedthreads when they are
     * available.  These pools will typically improve theperformance
     * of programs that execute manyshort-lived asynchronous tasks.
     * Calls to {@code execute} will reusepreviously constructed
     * threads if available. If no existingthread is available, a new
     * thread will be created and added to thepool. Threads that have
     * not been used for sixty seconds areterminated and removed from
     * the cache. Thus, a pool that remainsidle for long enough will
     * not consume any resources. Note thatpools with similar
     * properties but different details (forexample, timeout parameters)
     * may be created using {@linkThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorServicenewCachedThreadPool() {
        return new ThreadPoolExecutor(0,Integer.MAX_VALUE,
                                      60L,TimeUnit.SECONDS,
                                      newSynchronousQueue<Runnable>());
    }
 

看到程式碼有沒有很熟悉,呼叫的上我們上一篇文章中的ThreadPoolExecutor類的構造方法,只不過核心執行緒數為0,同時指定一個最大執行緒數。

newFixedThreadPool(int)

固定大小執行緒池這個很好理解,就是建立一個指定工作執行緒數量的執行緒池,如果執行緒達到設定的最大數,就將提交的任務放到執行緒池的佇列中。一個典型且優秀的執行緒池,它具有執行緒池提高程式效率和節省建立執行緒時所耗的開銷的優點。但線上程池空閒時,即執行緒池中沒有可執行任務時,它不會釋放工作執行緒,還會佔用一定的系統資源。

一般建立:

ExecutorServicenewFixedThreadPool=Executors.newFixedThreadPool(5);

點選進入原始碼:

    /**
     * Creates a thread pool that reuses afixed number of threads
     * operating off a shared unboundedqueue.  At any point, at most
     * {@code nThreads} threads will be activeprocessing tasks.
     * If additional tasks are submitted whenall threads are active,
     * they will wait in the queue until athread is available.
     * If any thread terminates due to afailure during execution
     * prior to shutdown, a new one will takeits place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@linkExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads inthe pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if{@code nThreads <= 0}
     */
    public static ExecutorServicenewFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads,nThreads,
                                      0L,TimeUnit.MILLISECONDS,
                                      newLinkedBlockingQueue<Runnable>());
    }

呼叫的依然是我們上一篇文章中的ThreadPoolExecutor類的構造方法,只不過核心執行緒數為和最大執行緒數一樣都是我們人為指定的。

newSingleThreadExecutor()

單執行緒執行緒池,只建立唯一的執行緒來執行任務,如果這個執行緒異常結束,會有另一個取代它,保證順序執行。

一般建立方法:

ExecutorServicenewSingleThreadExecutor = Executors.newSingleThreadExecutor();
點選進入原始碼:
    /**
     * Creates an Executor that uses a singleworker thread operating
     * off an unbounded queue. (Note howeverthat if this single
     * thread terminates due to a failureduring execution prior to
     * shutdown, a new one will take its placeif needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one taskwill be active at any
     * given time. Unlike the otherwiseequivalent
     * {@code newFixedThreadPool(1)} thereturned executor is
     * guaranteed not to be reconfigurable touse additional threads.
     *
     * @return the newly createdsingle-threaded Executor
     */
    public static ExecutorServicenewSingleThreadExecutor() {
        return newFinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L,TimeUnit.MILLISECONDS,
                                    newLinkedBlockingQueue<Runnable>()));
    }
 
    呼叫的依然是我們上一篇文章中的ThreadPoolExecutor類的構造方法,只不過核心執行緒數為和最大執行緒數一樣都是1

介紹到這裡我們發現這三個執行緒池呼叫的都是ThreadPoolExecutor的建構函式,這三個執行緒的區別除了核心執行緒數和最大執行緒數引數不一樣外,最重要的是傳入的最後一個引數即workQueue是不一樣的。

newCachedThreadPool的引數為SynchronousQueue,newFixedThreadPoolnewSingleThreadExecutor的引數都為LinkedBlockingQueue,其實這一種排隊策略也叫阻塞佇列,那接下來我們就來介紹一下常見的阻塞佇列。

阻塞佇列BlockingQueue

阻塞佇列顧名思義首先它是一個佇列,常見的佇列有“後進先出”的棧和“先進先出”的佇列。多執行緒環境中,通過佇列可以很容易實現資料共享,最經典的就是“生產者”和“消費者”模型,這就是一個典型的阻塞佇列,比如生產者生產到一定程度必須停一下,讓生產者執行緒掛起,這就是阻塞。

在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚醒)

java.util.concurrent包中的BlockingQueue就是阻塞佇列的介面,作為BlockingQueue的使用者,我們再也不需要關心什麼時候需要阻塞執行緒,什麼時候需要喚醒執行緒,因為這一切BlockingQueue都給你一手包辦了,並且它還是執行緒安全的。那我們現在來看下BlockingQueue介面的原始碼:

public interfaceBlockingQueue<E> extends Queue<E> {
 
    boolean add(E e);
 
 
    boolean offer(E e);
 
 
    void put(E e) throws InterruptedException;
 
 
    boolean offer(E e, long timeout, TimeUnitunit)
        throws InterruptedException;
 
 
    E take() throws InterruptedException;
 
 
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
 
 
    int remainingCapacity();
 
 
    boolean remove(Object o);
 
 
    public boolean contains(Object o);
 
 
    int drainTo(Collection<? super E> c);
 
 
    int drainTo(Collection<? super E> c,int maxElements);
}

    上面就是介面的所有方法,現在我們就介紹下這個介面中的核心方法:

放入資料:

    boolean add(E e);

這個方法將將泛型物件加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執行方法的執行緒)

    boolean offer(E e);

這個方法將將泛型物件加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執行方法的執行緒)

    boolean offer(E e, long timeout, TimeUnitunit)throws InterruptedException;

這個方法可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗。(本方法不阻塞當前執行方法的執行緒)

    void put(E e) throws InterruptedException;

這個方法泛型物件放到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.(本方法有阻塞的功能)

移除資料:

    boolean remove(Object o);

這個方法從BlockingQueue取出一個隊首的物件,如果在指定時間內佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。(本方法不阻塞當前執行方法的執行緒)

    E poll(long timeout, TimeUnit unit) throws InterruptedException;

這個方法從BlockingQueue取出一個隊首的物件,如果在指定時間內佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。(本方法不阻塞當前執行方法的執行緒)

     E take() throws InterruptedException;

這個方法是取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的資料被加入;(本方法有阻塞的功能)

    int drainTo(Collection<? super E> c);

這個方法是取走BlockingQueue裡排在首位的物件,取不到時返回null;(本方法不阻塞當前執行方法的執行緒)

    int drainTo(Collection<? super E> c,int maxElements);

這個方法是取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null;(本方法不阻塞當前執行方法的執行緒)

總結一下BlockingQueue介面中的方法,這些方法以四種形式出現,對於不能立即滿足但可能在將來某一時刻可以滿足的操作,這四種形式的處理方式不同:第一種是丟擲一個異常,第二種是返回一個特殊值(null或 false,具體取決於操作),第三種是在操作可以成功前,無限期地阻塞當前執行緒,第四種是在放棄前只在給定的最大時間限制內阻塞。

丟擲異常

特殊值

阻塞

超時

插入

add(e)

offer(e)

put(e)

offer(e,time,unit)

移除

remove()

poll()

take()

poll(time,unit)

檢查

element()

peek()

不可用

不可用

BlockingQueue實現類


1)ArrayBlockingQueue:基於陣列實現的一個阻塞佇列,在建立ArrayBlockingQueue物件時必須制定容量大小,以便快取佇列中資料物件。並且可以指定公平性與非公平性,預設情況下為非公平的,即不保證等待時間最長的佇列最優先能夠訪問佇列。其所含的物件是以FIFO(先入先出)順序排序的.

2)LinkedBlockingQueue:基於連結串列實現的一個阻塞佇列,在建立LinkedBlockingQueue物件時如果不指定容量大小,則預設大小為Integer.MAX_VALUE。其所含的物件是以FIFO(先入先出)順序排序的

3)PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含物件的排序不是FIFO,它會按照元素的優先順序對元素進行排序,按照優先順序順序出隊,每次出隊的元素都是優先順序最高的元素。注意,此阻塞佇列為無界阻塞佇列,即容量沒有上限(通過原始碼就可以知道,它沒有容器滿的訊號標誌),前面2種都是有界佇列。

4)DelayQueue:基於PriorityQueue,一種延時阻塞佇列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue也是一個無界佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。

5)SynchronousQueue:一種無緩衝的等待佇列,類似於無中介的直接交易,

其中LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的資料結構不一樣,導致LinkedBlockingQueue的資料吞吐量要大於ArrayBlockingQueue,但線上程數量很大時其效能的可預見性低於ArrayBlockingQueue.

總結

我們這篇文章延續了上一篇文章中關於ThreadPoolExecutor執行緒池的一些內容,分別是newCachedThreadPool、newFixedThreadPool、newSingleThreadExecutor,同時根據這些執行緒池與ThreadPoolExecutor的關係,進而引出了阻塞佇列BlockingQueue,於是我們詳細介紹了介面BlockingQueue和介面中的方法,最後又介紹了介面BlockingQueue的實現類。