1. 程式人生 > >跟我學Java多線程——線程池與堵塞隊列

跟我學Java多線程——線程池與堵塞隊列

信號 線程的創建 margin cit rect weight offer 成功 rain

前言

上一篇文章中我們將ThreadPoolExecutor進行了深入的學習和介紹,實際上我們在項目中應用的時候非常少有直接應用ThreadPoolExecutor來創建線程池的。在jdkapi中有這麽一句話“可是,強烈建議程序猿使用較為方便的 Executors 工廠方法Executors.newCachedThreadPool()(無界線程池,能夠進行自己主動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)和Executors.newSingleThreadExecutor()(單個後臺線程),它們均為大多數使用場景提前定義了設置。

”所以這篇文章我們繼續學習其他幾種線程池。

線程池分類

newCachedThreadPool()

創建一個可緩存的線程池。即這個線程池是無界線程池。無界指工作線程的創建數量差點兒沒有限制(事實上也有限制的,數目為Interger.MAX_VALUE),這樣能夠靈活的往線程池中加入數據;能夠進行自己主動線程回收指的是假設長時間沒有往線程池中提交任務。即假設工作線程空暇了指定的時間,則該工作線程將自己主動終止。終止後。假設你又提交了新的任務。則線程池又一次創建一個工作線程。

我們一般使用例如以下代碼進行創建:

ExecutorServiceservice = Executors.newCachedThreadPool();

我們點擊代碼進入源代碼:

   /**
     * Creates a thread pool that creates newthreads as needed, but
     * will reuse previously constructedthreads when they are
     * available.  These pools will typically improve theperformance
     * of programs that execute manyshort-lived asynchronous tasks.
     * Calls to [email protected]
/* */ execute} will reusepreviously constructed * threads if available. If no existingthread is available, a new * thread will be created and added to thepool. Threads that have * not been used for sixty seconds areterminated and removed from * the cache. Thus, a pool that remainsidle for long enough will * not consume any resources. Note thatpools with similar * properties but different details (forexample, timeout parameters) * may be created using [email protected]} constructors. * * @return the newly created thread pool */ public static ExecutorServicenewCachedThreadPool() { return new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L,TimeUnit.SECONDS, newSynchronousQueue<Runnable>()); }

看到代碼有沒有非常熟悉。調用的上我們上一篇文章中的ThreadPoolExecutor類的構造方法,僅僅只是核心線程數為0。同一時候指定一個最大線程數。


newFixedThreadPool(int)

固定大小線程池這個非常好理解。就是創建一個指定工作線程數量的線程池,假設線程達到設置的最大數。就將提交的任務放到線程池的隊列中。一個典型且優秀的線程池,它具有線程池提高程序效率和節省創建線程時所耗的開銷的長處。

但在線程池空暇時,即線程池中沒有可執行任務時,它不會釋放工作線程,還會占用一定的系統資源。

一般創建:

ExecutorServicenewFixedThreadPool=Executors.newFixedThreadPool(5);

點擊進入源代碼:

    /**
     * Creates a thread pool that reuses afixed number of threads
     * operating off a shared unboundedqueue.  At any point, at most
     * [email protected] nThreads} threads will be activeprocessing tasks.
     * If additional tasks are submitted whenall threads are active,
     * they will wait in the queue until athread is available.
     * If any thread terminates due to afailure during execution
     * prior to shutdown, a new one will takeits place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly [email protected]#shutdown shutdown}.
     *
     * @param nThreads the number of threads inthe pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException [email protected] nThreads <= 0}
     */
    public static ExecutorServicenewFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads,nThreads,
                                      0L,TimeUnit.MILLISECONDS,
                                      newLinkedBlockingQueue<Runnable>());
    }

調用的依舊是我們上一篇文章中的ThreadPoolExecutor類的構造方法,僅僅只是核心線程數為和最大線程數一樣都是我們人為指定的。

newSingleThreadExecutor()

單線程線程池,僅僅創建唯一的線程來運行任務。假設這個線程異常結束。會有還有一個代替它,保證順序運行。

一般創建方法:

ExecutorServicenewSingleThreadExecutor = Executors.newSingleThreadExecutor();
點擊進入源代碼:

    /**
     * Creates an Executor that uses a singleworker thread operating
     * off an unbounded queue. (Note howeverthat if this single
     * thread terminates due to a failureduring execution prior to
     * shutdown, a new one will take its placeif needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one taskwill be active at any
     * given time. Unlike the otherwiseequivalent
     * [email protected] newFixedThreadPool(1)} thereturned executor is
     * guaranteed not to be reconfigurable touse additional threads.
     *
     * @return the newly createdsingle-threaded Executor
     */
    public static ExecutorServicenewSingleThreadExecutor() {
        return newFinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L,TimeUnit.MILLISECONDS,
                                    newLinkedBlockingQueue<Runnable>()));
    }
 
調用的依舊是我們上一篇文章中的ThreadPoolExecutor類的構造方法,僅僅只是核心線程數為和最大線程數一樣都是1

介紹到這裏我們發現這三個線程池調用的都是ThreadPoolExecutor的構造函數。這三個線程的差別除了核心線程數和最大線程數參數不一樣外,最重要的是傳入的最後一個參數即workQueue是不一樣的。

newCachedThreadPool的參數為SynchronousQueue,newFixedThreadPoolnewSingleThreadExecutor的參數都為LinkedBlockingQueue。事實上這一種排隊策略也叫堵塞隊列。那接下來我們就來介紹一下常見的堵塞隊列。

堵塞隊列BlockingQueue

堵塞隊列顧名思義首先它是一個隊列,常見的隊列有“後進先出”的棧和“先進先出”的隊列。多線程環境中,通過隊列能夠非常easy實現數據共享,最經典的就是“生產者”和“消費者”模型。這就是一個典型的堵塞隊列,比方生產者生產到一定程度必須停一下,讓生產者線程掛起,這就是堵塞。

在多線程領域:所謂堵塞,在某些情況下會掛起線程(即堵塞)。一旦條件滿足。被掛起的線程又會自己主動被喚醒)

java.util.concurrent包中的BlockingQueue就是堵塞隊列的接口,作為BlockingQueue的使用者,我們再也不須要關心什麽時候須要堵塞線程。什麽時候須要喚醒線程,由於這一切BlockingQueue都給你一手包辦了,而且它還是線程安全的。那我們如今來看下BlockingQueue接口的源代碼:

public interfaceBlockingQueue<E> extends Queue<E> {
 
    boolean add(E e);
 
 
    boolean offer(E e);
 
 
    void put(E e) throws InterruptedException;
 
 
    boolean offer(E e, long timeout, TimeUnitunit)
        throws InterruptedException;
 
 
    E take() throws InterruptedException;
 
 
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
 
 
    int remainingCapacity();
 
 
    boolean remove(Object o);
 
 
    public boolean contains(Object o);
 
 
    int drainTo(Collection<?

super E> c); int drainTo(Collection<?

super E> c,int maxElements); }

上面就是接口的全部方法,如今我們就介紹下這個接口中的核心方法:


放入數據:

    boolean add(E e);

這種方法將將泛型對象加到BlockingQueue裏,即假設BlockingQueue能夠容納,則返回true,否則返回false.(本方法不堵塞當前運行方法的線程)

    boolean offer(E e);

這種方法將將泛型對象加到BlockingQueue裏,即假設BlockingQueue能夠容納,則返回true,否則返回false.(本方法不堵塞當前運行方法的線程)


    boolean offer(E e, long timeout, TimeUnitunit)throws InterruptedException;

這種方法能夠設定等待的時間,假設在指定的時間內,還不能往隊列中增加BlockingQueue,則返回失敗。(本方法不堵塞當前運行方法的線程)

    void put(E e) throws InterruptedException;

這種方法泛型對象放到BlockingQueue裏,假設BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue裏面有空間再繼續.(本方法有堵塞的功能)


移除數據:

    boolean remove(Object o);

這種方法從BlockingQueue取出一個隊首的對象,假設在指定時間內隊列一旦有數據可取,則馬上返回隊列中的數據。否則知道時間超時還沒有數據可取,返回失敗。(本方法不堵塞當前運行方法的線程)

    E poll(long timeout, TimeUnit unit) throws InterruptedException;

這種方法從BlockingQueue取出一個隊首的對象,假設在指定時間內隊列一旦有數據可取,則馬上返回隊列中的數據。否則知道時間超時還沒有數據可取。返回失敗。(本方法不堵塞當前運行方法的線程)

     E take() throws InterruptedException;

這種方法是取走BlockingQueue裏排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的數據被增加;(本方法有堵塞的功能)

    int drainTo(Collection<? super E> c);

這種方法是取走BlockingQueue裏排在首位的對象,取不到時返回null;(本方法不堵塞當前運行方法的線程)

    int drainTo(Collection<?

super E> c,int maxElements);

這種方法是取走BlockingQueue裏排在首位的對象,若不能馬上取出,則能夠等time參數規定的時間,取不到時返回null;(本方法不堵塞當前運行方法的線程)

總結一下BlockingQueue接口中的方法,這些方法以四種形式出現,對於不能馬上滿足但可能在將來某一時刻能夠滿足的操作,這四種形式的處理方式不同:第一種是拋出一個異常,另外一種是返回一個特殊值(null或 false,詳細取決於操作),第三種是在操作能夠成功前,無限期地堵塞當前線程,第四種是在放棄前僅僅在給定的最大時間限制內堵塞。

拋出異常

特殊值

堵塞

超時

插入

add(e)

offer(e)

put(e)

offer(e,time,unit)

移除

remove()

poll()

take()

poll(time,unit)

檢查

element()

peek()

不可用

不可用

BlockingQueue實現類

技術分享

1)ArrayBlockingQueue:基於數組實現的一個堵塞隊列。在創建ArrayBlockingQueue對象時必須制定容量大小。以便緩存隊列中數據對象。而且可以指定公平性與非公平性,默認情況下為非公平的,即不保證等待時間最長的隊列最優先可以訪問隊列。

其所含的對象是以FIFO(先入先出)順序排序的.

2)LinkedBlockingQueue:基於鏈表實現的一個堵塞隊列。在創建LinkedBlockingQueue對象時假設不指定容量大小,則默認大小為Integer.MAX_VALUE。其所含的對象是以FIFO(先入先出)順序排序的

3)PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含對象的排序不是FIFO,它會依照元素的優先級對元素進行排序,依照優先級順序出隊,每次出隊的元素都是優先級最高的元素。註意,此堵塞隊列為無界堵塞隊列,即容量沒有上限(通過源代碼就能夠知道,它沒有容器滿的信號標誌),前面2種都是有界隊列。

4)DelayQueue:基於PriorityQueue,一種延時堵塞隊列,DelayQueue中的元素僅僅有當其指定的延遲時間到了。才可以從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被堵塞,而僅僅有獲取數據的操作(消費者)才會被堵塞。

5)SynchronousQueue:一種無緩沖的等待隊列,類似於無中介的直接交易。

當中LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的數據結構不一樣,導致LinkedBlockingQueue的數據吞吐量要大於ArrayBlockingQueue,但在線程數量非常大時其性能的可預見性低於ArrayBlockingQueue.

總結

我們這篇文章延續了上一篇文章中關於ThreadPoolExecutor線程池的一些內容,各自是newCachedThreadPool、newFixedThreadPool、newSingleThreadExecutor,同一時候依據這些線程池與ThreadPoolExecutor的關系。進而引出了堵塞隊列BlockingQueue。於是我們具體介紹了接口BlockingQueue和接口中的方法,最後又介紹了接口BlockingQueue的實現類。

跟我學Java多線程——線程池與堵塞隊列