1. 程式人生 > >執行緒池ThreadPoolExecutor配置、引數詳解及例子

執行緒池ThreadPoolExecutor配置、引數詳解及例子

     對於執行緒池,我僅限於簡單的使用,對其原理和引數並不是很理解。以前只是在網上找個執行緒池的例子,然後模仿寫一下而已,最近專案中又再次用的執行緒池,做了一些研究,現記錄下,以備以後使用。

    我以前利用執行緒池只會這樣用:

ExecutorService pool = Executors.newFixedThreadPool(5);
pool.execute(new Runnable() {
    @Override
    public void run() {
        //具體執行緒要乾的活
    }
});

這個只是建立一個固定執行緒數為5的執行緒池(當然還有其他3種

這種寫法有一定的弊端,當系統併發的增長量遠遠高於執行緒池的消費量(比如系統是每秒中增加20個併發,而執行緒池是每秒鐘執行5個執行緒)。這就有可能造成伺服器記憶體溢位的風險。原因是newFixedThreadPool採用了無邊界佇列。可以從原始碼中一窺究竟。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

這裡LinkedBlockingQueue是無邊界佇列。ThreadPoolExecutor構造中的引數文章下面會逐一講解。總之上面的寫法不靈活,對於一些併發高的系統是不滿足的。

下面我們開始步入正題,開始研究下ThreadPoolExecutor執行緒池。

第一步,研究ThreadPoolExecutor執行緒池有幾種構造方法

第一個構造:
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

第二個構造:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

第三個構造:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

第四個構造:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

總結:從上面四個構造可以看出,其實前三個構造都是呼叫的第四個構造方法。所以我們研究第四個構造即可。 

第二步,研究第四個構造的每個引數

corePoolSize: 執行緒池核心執行緒數量   int型別

maximumPoolSize:執行緒池允許執行緒的最大數量      int型別

keepAliveTime: 執行緒池中執行緒所允許的空閒時間。long型別

      JDK解釋:當執行緒數大於核心時,此為終止前多餘的空閒執行緒等待新任務的最長時間。

       也就是說啊,執行緒池中當前的空閒執行緒服務完某任務後的存活時間。如果時間足夠長,那麼可能會服務其它任務。

       這些空閒執行緒指的是新建執行緒還是核心執行緒,還是兩者都包含。這個不是很清楚。(哪位大神知道可以告訴我下)

unit: 執行緒池維護執行緒所允許的空閒時間的單位 

   MICROSECONDS    微秒   一百萬分之一秒(就是毫秒/1000)
   MILLISECONDS    毫秒   千分之一秒    
   NANOSECONDS   毫微秒  十億分之一秒(就是微秒/1000)
   SECONDS          秒
   MINUTES     分鐘
   HOURS      小時
   DAYS      天

 

workQueue: 執行緒池所使用的緩衝佇列(關於佇列的詳解在後面第四步)

        直接提交 SynchronousQueue

        無界佇列 如LinkedBlockingQueue

        有界佇列 如ArrayBlockingQueue

handler: 執行緒池對拒絕任務的處理策略

       ThreadPoolExecutor.AbortPolicy()  丟擲java.util.concurrent.RejectedExecutionException異常。 

        ThreadPoolExecutor.CallerRunsPolicy()  重試添加當前的任務,他會自動重複呼叫execute()方法。 

        ThreadPoolExecutor.DiscardOldestPolicy()  拋棄舊的任務  

         ThreadPoolExecutor.DiscardPolicy()  拋棄當前的任務。 

threadFactory:執行緒工廠,主要用來建立執行緒:預設值 DefaultThreadFactory

第三步,有一個例子,對執行緒池描述的非常貼切。

   有一個工廠,核定工人數量為10,每個工人同時只做一件事,

   因此只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人做;

 當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待

   如果佇列workQueue的容量為1,且現在10個工人都在幹活,

              又來一個任務A,則把A放到到佇列中等待有空閒工人來從佇列中拿任務。

              但是有這麼一種情況,來的不是一個任務,是5個任務,且10個工人都沒有空閒,且佇列也放滿了(1個任務就滿了哦)

               這時候,老闆一看來的活多,幹不過來了,怎麼辦,老闆就想招了,再招5個臨時工吧,這時候工廠就是15個工人

總結:corePoolSize=10;maximumPoolSize=15。當任務數大於corePoolSize時,是先向佇列裡放,當佇列裡也滿了,還有任務不斷進來,才新建執行緒哦。且最大不會超過maximumPoolSize。

問題來了,如果佇列滿了,且執行緒數到最大執行緒數了,還進來任務怎麼辦呢。這就用到了handler異常策略(或者叫飽和策略)。

第四步,緩衝佇列詳解。

4.1、直接提交 SynchronousQueue

       該佇列是將任務直接提交給執行緒而不儲存它們。在此,如果不存在空閒的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。           SynchronousQueue執行緒安全的Queue,可以存放若干任務(但當前只允許有且只有一個任務在等待),其中每個插入操作必須等待另一個執行緒的對應移除操作,也就是說A任務進入佇列,B任務必須等A任務被移除之後才能進入佇列,否則執行異常策略。你來一個我扔一個,所以說SynchronousQueue沒有任何內部容量。

       比如:核心執行緒數為2,最大執行緒數為3;使用SynchronousQueue。

       當前有2個核心執行緒在執行,又來了個A任務,兩個核心執行緒沒有執行完當前任務,根據如果執行的執行緒等於或多於 corePoolSize,

      則 Executor 始終首選將請求加入佇列,而不新增新的執行緒。所以A任務被新增到佇列,此時的佇列是SynchronousQueue,

      當前不存在可用於立即執行任務的執行緒,因此會構造一個新的執行緒,此時又來了個B任務,兩個核心執行緒還沒有執行完。

       新建立的執行緒正在執行A任務,所以B任務進入Queue後,最大執行緒數為3,發現沒地方仍了。就只能執行異常策略(RejectedExecutionException)。

4.2、無界佇列  LinkedBlockingQueue

        使用無界佇列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有核心執行緒都在忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就沒意義了。)也就不會有新執行緒被建立,都在那等著排隊呢。如果未指定容量,則它等於 Integer.MAX_VALUE。如果設定了Queue預定義容量,則當核心執行緒忙碌時,新任務會在佇列中等待,直到超過預定義容量(新任務沒地方放了),才會執行異常策略。你來一個我接一個,直到我容不下你了。FIFO,先進先出。

        比如:核心執行緒數為2,最大執行緒數為3;使用LinkedBlockingQueue(1),設定容量為1。

        當前有2個核心執行緒在執行,又來了個A任務,兩個核心執行緒沒有執行完當前任務,根據如果執行的執行緒等於或多於 corePoolSize,

        則 Executor 始終首選將請求加入佇列,而不新增新的執行緒。所以A任務被新增到佇列,此時的佇列是LinkedBlockingQueue,

        此時又來了個B任務,兩個核心執行緒沒有執行完當前任務,A任務在佇列中等待,佇列已滿。所以根據如果無法將請求加入佇列,則建立新的執行緒,

         B任務被新建立的執行緒所執行,此時又來個C任務,此時maximumPoolSize已滿,佇列已滿,只能執行異常策略(RejectedExecutionException)。

4.3 有界佇列  ArrayBlockingQueue

       操作模式跟LinkedBlockingQueue查不多,只不過必須為其設定容量。所以叫有界佇列。new ArrayBlockingQueue<Runnable>(Integer.MAX_VALUE) 跟 new LinkedBlockingQueue(Integer.MAX_VALUE)效果一樣。LinkedBlockingQueue 底層是連結串列結構,ArrayBlockingQueue  底層是陣列結構。你來一個我接一個,直到我容不下你了。FIFO,先進先出。

總結下:

使用無界佇列,要防止任務增長的速度遠遠超過處理任務的速度,控制不好可能導致的結果就是記憶體溢位。

使用有界佇列,關鍵在於調節執行緒數和Queue大小 ,執行緒數多,佇列容量少,資源浪費。執行緒數少,佇列容量多,效能低,還可能導致記憶體溢位。

第五步,我的一個實際應用例子程式碼如下(我採用的是xml配置式,更加靈活)

5.1、xml中配置執行緒池ThreadPoolExecutor

<!--執行緒池配置開始 -->
   <bean id="linkQueue" class="java.util.concurrent.LinkedBlockingQueue">
       <constructor-arg name="capacity" value="${thread.pool.linkQueue.size}"/>
   </bean>
   <bean id="pool" class="java.util.concurrent.ThreadPoolExecutor">
       <constructor-arg name="corePoolSize" value="${thread.pool.core.size}"/>
       <constructor-arg name="maximumPoolSize" value="${thread.pool.max.size}"/>
       <constructor-arg name="keepAliveTime" value="${thread.pool.keep.alive.time}"/>
       <constructor-arg name="unit" value="${thread.pool.time.unit}"/>
       <constructor-arg name="workQueue" ref="linkQueue"/>
   </bean>
<!--執行緒池配置結束 -->

5.2、properties檔案中配置如下:

#執行緒池相關配置
#LinkedBlockingQueue佇列容量
thread.pool.linkQueue.size=10
#執行緒池核心執行緒數
thread.pool.core.size=5
#執行緒池最大執行緒數
thread.pool.max.size=10
#空閒執行緒最大存活時間
thread.pool.keep.alive.time=2
/#時間單位 SECONDS(秒) MILLISECONDS(毫秒) MICROSECONDS(微秒)
thread.pool.time.unit=SECONDS

5.3、後臺java程式碼

public class SaveEventAdviceHolder {


    @Autowired
    private ThreadPoolExecutor pool;

    public void saveSerializeObjEvent(Object[] objects,SaveEventBO bo){
        //設定執行緒池的異常策略為CallerRunsPolicy()
        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
        pool.setRejectedExecutionHandler(handler);
        pool.execute(new Runnable() {
            public void run() {
               //執行緒具體執行任務.....

        });
    }
 }