1. 程式人生 > >Java執行緒池原理和使用

Java執行緒池原理和使用

為什麼要用執行緒池?

諸如 Web 伺服器、資料庫伺服器、檔案伺服器或郵件伺服器之類的許多伺服器應用程式都面向處理來自某些遠端來源的大量短小的任務。請求以某種方式到達伺服器,這種方式可能是通過網路協議(例如 HTTP、FTP 或 POP)、通過 JMS 佇列或者可能通過輪詢資料庫。不管請求如何到達,伺服器應用程式中經常出現的情況是:單個任務處理的時間很短而請求的數目卻是巨大的。

構建伺服器應用程式的一個過於簡單的模型應該是:每當一個請求到達就建立一個新執行緒,然後在新執行緒中為請求服務。實際上,對於原型開發這種方法工作得很好,但如果試圖部署以這種方式執行的伺服器應用程式,那麼這種方法的嚴重不足就很明顯。每個請求對應一個執行緒(thread-per-request)方法的不足之一是:為每個請求建立一個新執行緒的開銷很大;為每個請求建立新執行緒的伺服器在建立和銷燬執行緒上花費的時間和消耗的系統資源要比花在處理實際的使用者請求的時間和資源更多。

除了建立和銷燬執行緒的開銷之外,活動的執行緒也消耗系統資源。在一個 JVM 裡建立太多的執行緒可能會導致系統由於過度消耗記憶體而用完記憶體或“切換過度”。為了防止資源不足,伺服器應用程式需要一些辦法來限制任何給定時刻處理的請求數目。

執行緒池為執行緒生命週期開銷問題和資源不足問題提供瞭解決方案。通過對多個任務重用執行緒,執行緒建立的開銷被分攤到了多個任務上。其好處是,因為在請求到達時執行緒已經存在,所以無意中也消除了執行緒建立所帶來的延遲。這樣,就可以立即為請求服務,使應用程式響應更快。而且,通過適當地調整執行緒池中的執行緒數目,也就是當請求的數目超過某個閾值時,就強制其它任何新到的請求一直等待,直到獲得一個執行緒來處理為止,從而可以防止資源不足。

2、ThreadPoolExecutor類介紹

Java中的執行緒池技術主要用的是ThreadPoolExecutor 這個類。先來看這個類的建構函式,

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 

    corePoolSize       

執行緒池維護執行緒的最少數量

    maximumPoolSize    執行緒池維護執行緒的最大數量 

    keepAliveTime      執行緒池維護執行緒所允許的空閒時間  

    workQueue          任務佇列,用來存放我們所定義的任務處理執行緒

    threadFactory      執行緒建立工廠

    handler            執行緒池對拒絕任務的處理策略

     ThreadPoolExecutor 將根據 corePoolSize和 maximumPoolSize 設定的邊界自動調整池大小。當新任務在方法

execute(Runnable) 中提交時, 如果執行的執行緒少於 corePoolSize,則建立新執行緒來處理請求,即使其他輔助執行緒是

空閒的。如果執行的執行緒多於 corePoolSize 而少於 maximumPoolSize,則僅當佇列滿時才建立新執行緒。 如果設定的

corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的執行緒池。

     ThreadPoolExecutor是Executors類的實現Executors類裡面提供了一些靜態工廠,生成一些常用的執行緒池,主

要有以下幾個:

     newSingleThreadExecutor:建立一個單執行緒的執行緒池。這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒序列執行

所有任務。如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。此執行緒池保證所有任務的執行順序按照任

務的提交順序執行。  

     newFixedThreadPool:建立固定大小的執行緒池。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。線

程池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。

     newCachedThreadPool:建立一個可快取的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分

空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池

大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。

      在實際的專案中,我們會使用得到比較多的是newFixedThreadPool,建立固定大小的執行緒池,但是這個方法在真實的線上

環境中還是會有很多問題,這個將會在下面一節中詳細講到。

      當任務源源不斷的過來,而我們的系統又處理不過來的時候,我們要採取的策略是拒絕服務。RejectedExecutionHandler接

口提供了拒絕任務處理的自定義方法的機會。在ThreadPoolExecutor中已經包含四種處理策略。

      1)CallerRunsPolicy:執行緒呼叫執行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

             if (!e.isShutdown()) {

                 r.run();

            }

        }

這個策略顯然不想放棄執行任務。但是由於池中已經沒有任何資源了,那麼就直接使用呼叫該execute的執行緒本身來執行。

     2)AbortPolicy處理程式遭到拒絕將丟擲執行時 RejectedExecutionException

         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

              throw new RejectedExecutionException();

        }

 這種策略直接丟擲異常,丟棄任務。

      3)DiscardPolicy不能執行的任務將被刪除

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}

   這種策略和AbortPolicy幾乎一樣,也是丟棄任務,只不過他不丟擲異常。

     4)DiscardOldestPolicy如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,

則重複此過程)

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

                e.getQueue().poll();

                e.execute(r);

            }

        }

      該策略就稍微複雜一些,在pool沒有關閉的前提下首先丟掉快取在佇列中的最早的任務,然後重新嘗試執行該任務。這個策略

需要適當小心

3、 ThreadPoolExecutor無界佇列使用
   public class ThreadPool {

        private final static String poolName = "mypool";

        static private ThreadPool threadFixedPool = new ThreadPool(2);

       private ExecutorService executor;

      static public ThreadPool getFixedInstance() {

           return threadFixedPool;

       }

    private ThreadPool(int num) {

           executor = Executors.newFixedThreadPool(num, new DaemonThreadFactory(poolName));

}

public void execute(Runnable r) {

           executor.execute(r);

}

public static void main(String[] params) {

           class MyRunnable implements Runnable {

                    public void run() {

                             System.out.println("OK!");

                             try {

                                       Thread.sleep(10);

                             } catch (InterruptedException e) {

                                       e.printStackTrace();

                             }

                    }

           }

           for (int i = 0; i < 10; i++) {

             ThreadPool.getFixedInstance().execute(new MyRunnable());

           }

           try {

                    Thread.sleep(2000);

                    System.out.println("Process end.");

           } catch (InterruptedException e) {

                    e.printStackTrace();

           }

}

}

    在這段程式碼中,我們發現我們用到了Executors.newFixedThreadPool()函式,這個函式的實現是這樣子的:

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); 

       它實際上是建立了一個無界佇列的固定大小的執行緒池。執行這段程式碼,我們發現所有的任務都正常處理了。但是在真實的線上環

境中會存在這樣的一個問題,前端的使用者請求源源不斷的過來,後端的處理執行緒如果處理時間變長,無法快速的將使用者請求處理

完返回結果給前端,那麼任務佇列中將堵塞大量的請求。這些請求在前端都是有超時時間設定的,假設請求是通過套接字過來,

當我們的後端處理程序處理完一個請求後,從佇列中拿下一個任務,發現這個任務的套接字已經無效了,這是因為在使用者端已經

超時,將套接字建立的連線關閉了。這樣一來我們這邊的處理程式再去讀取套接字時,就會發生I/0 Exception. 惡性迴圈,導致我

們所有的處理服務執行緒讀的都是超時的套接字,所有的請求過來都拋I/O異常,這樣等於我們整個系統都掛掉了,已經無法對外提供

正常的服務了。

     對於海量資料的處理,現在業界都是採用集群系統來進行處理,當請求的數量不斷加大的時候,我們可以通過增加處理節點,反正現

在硬體裝置相對便宜。但是要保證系統的可靠性和穩定性,在程式方面我們還是可以進一步的優化的,我們下一節要講述的就是針對

線上出現的這類問題的一種處理策略。



4、ThreadPoolExecutor有界佇列使用

public class ThreadPool {

         private final static String poolName = "mypool";

         static private ThreadPool threadFixedPool = null;

         public ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

         private ExecutorService executor;

 

         static public ThreadPool getFixedInstance() {

                   return threadFixedPool;

         }

         private ThreadPool(int num) {

                   executor = new ThreadPoolExecutor(2, 4,60,TimeUnit.SECONDS, queue,new DaemonThreadFactory

(poolName), new ThreadPoolExecutor.AbortPolicy());

         }

         public void execute(Runnable r) {

                   executor.execute(r);

         }

        

         public static void main(String[] params) {

                   class MyRunnable implements Runnable {

                            public void run() {

                                     System.out.println("OK!");

                                     try {

                                               Thread.sleep(10);

                                     } catch (InterruptedException e) {

                                               e.printStackTrace();

                                     }

                            }

                   }

                   int count = 0;

                   for (int i = 0; i < 10; i++) {

                            try {

                                     ThreadPool.getFixedInstance().execute(new MyRunnable());

                            } catch (RejectedExecutionException e) {

                                     e.printStackTrace();

                                     count++;

                            }

                   }

                   try {

                            log.info("queue size:" + ThreadPool.getFixedInstance().queue.size());

                            Thread.sleep(2000);

                   } catch (InterruptedException e) {

                            e.printStackTrace();

                   }

                   System.out.println("Reject task: " + count);

         }

}

  首先我們來看下這段程式碼幾個重要的引數,corePoolSize 為2,maximumPoolSize為4,任務佇列大小為2,每個任務平

均處理時間為10ms,一共有10個併發任務。

      執行這段程式碼,我們會發現,有4個任務失敗了。這裡就驗證了我們在上面提到有界佇列時候執行緒池的執行順序。當新任務在

方法 execute(Runnable) 中提交時, 如果執行的執行緒少於 corePoolSize,則建立新執行緒來處理請求。 如果執行的執行緒多於

corePoolSize 而少於 maximumPoolSize,則僅當佇列滿時才建立新執行緒,如果此時執行緒數量達到maximumPoolSize,並且隊

列已經滿,就會拒絕繼續進來的請求。

    現在我們調整一下程式碼中的幾個引數,將併發任務數改為200,執行結果Reject task: 182,說明有18個任務成功了,執行緒處理

完一個請求後會接著去處理下一個過來的請求。在真實的線上環境中,會源源不斷的有新的請求過來,當前的被拒絕了,但只要線

程池執行緒把當下的任務處理完之後還是可以處理下一個傳送過來的請求。

     通過有界佇列可以實現系統的過載保護,在高壓的情況下,我們的系統處理能力不會變為0,還能正常對外進行服務。

5、舉例分析

  • execute(Runnable command):履行Ruannable型別的任務
  • submit(task):可用來提交Callable或Runnable任務,並返回代表此任務的Future物件
  • invokeAll(collection of tasks):執行給定的任務,當所有任務完成時,返回保持任務狀態和結果的 Future 列表.
  • shutdown():在完成已提交的任務後封閉辦事,不再接管新任務
  • shutdownNow():停止所有正在履行的任務並封閉辦事。
  • isTerminated():測試是否所有任務都履行完畢了。
  • isShutdown():測試是否該ExecutorService已被封閉

1、固定大小執行緒池

import java.util.concurrent.Executors;  
import java.util.concurrent.ExecutorService;

ExecutorService pool = Executors.newFixedThreadPool(2);

pool.execute(t1);

pool.shutdown();

2、單任務執行緒池

ExecutorService pool = Executors.newSingleThreadExecutor();

3、可變尺寸執行緒池

ExecutorService pool = Executors.newCachedThreadPool();

4、延遲連線池

import java.util.concurrent.Executors;  
import java.util.concurrent.ScheduledExecutorService;  
import java.util.concurrent.TimeUnit;

ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

pool.schedule(t4, 10, TimeUnit.MILLISECONDS);

5、單任務延遲連線池

ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();

例子分析:

 schedule(Runnable command, long delay, TimeUnit unit),schedule方法被用來延遲指定時間後執行某個指定任務。

public class Job implements Runnable { 
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); 
public void run() { 
try { 
            Thread.sleep(5000); 
        } catch (InterruptedException ex) { 
            ex.printStackTrace(); 
        } 
        System.out.println("do something  at:" + sdf.format(new Date())); 
    } 
} 
public class ScheduledExecutorServiceTest { 
public static void main(String[] args) { 
        ScheduledExecutorService schedule = Executors.newScheduledThreadPool(5); 
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); 
        System.out.println(" begin to do something at:" + sdf.format(new Date())); 
        schedule.schedule(new Job(),1, TimeUnit.SECONDS); 
    } 
} 

輸出如下:

  1. begin to do something at:2012-08-03 09:31:36
  2. do something  at:2012-08-03 09:31:42

注:此時程式不會推出,若想讓程式推出,需要加上schedule.shutdown();

ScheduledExecutorService 中兩種最常用的排程方法 ScheduleAtFixedRate 和 ScheduleWithFixedDelay。ScheduleAtFixedRate 每次執行時間為上一次任務開始起向後推一個時間間隔,即每次執行時間為 :initialDelay, initialDelay+period, initialDelay+2*period, …;ScheduleWithFixedDelay 每次執行時間為上一次任務結束起向後推一個時間間隔,即每次執行時間為:initialDelay, initialDelay+executeTime+delay, initialDelay+2*executeTime+2*delay。由此可見,ScheduleAtFixedRate 是基於固定時間間隔進行任務排程,ScheduleWithFixedDelay 取決於每次任務執行的時間長短,是基於不固定時間間隔進行任務排程。

2.scheduleWithFixedDelay 
         scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit) 
         建立並執行一個在給定初始延遲後首次啟用的定期操作,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲,如果任務的執行時間超過了廷遲時間(delay),下一個任務則會在 
(當前任務執行所需時間+delay)後執行。 

public class ScheduledExecutorServiceTest { 
public static void main(String[] args) { 
            ScheduledExecutorService schedule = Executors.newScheduledThreadPool(5); 
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); 
            System.out.println(" begin to do something at:" + sdf.format(new Date())); 
            schedule.scheduleWithFixedDelay(new Job(), 1, 2, TimeUnit.SECONDS); 
        } 
    } 

   輸出如下:

  1. begin to do something at:2012-08-03 09:36:53
  2. do something at:2012-08-03 09:36:59
  3. do something at:2012-08-03 09:37:06
  4. do something at:2012-08-03 09:37:13

3.scheduleAtFixedRate 
         scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnitunit) 
         建立並執行一個在給定初始延遲後首次啟用的定期操作,後續操作具有給定的週期;也就是將在 initialDelay 後開始執行,然後在initialDelay+period 後執行,接著在 initialDelay + 2 * period 後執行,依此類推。 如果任務的執行時間小於period,將會按上述規律執行。否則,則會按 任務的實際執行時間進行週期執行。 
    

public class ScheduledExecutorServiceTest { 
public static void main(String[] args) { 
        ScheduledExecutorService schedule = Executors.newScheduledThreadPool(2); 
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); 
        System.out.println(" begin to do something at:" + sdf.format(new Date())); 
        schedule.scheduleAtFixedRate(new Job(), 1,2, TimeUnit.SECONDS); 
    } 

結果輸出:

  1. begin to do something at:2012-08-04 08:53:30
  2. do something at:2012-08-04 08:53:36
  3. do something at:2012-08-04 08:53:41
  4. do something at:2012-08-04 08:53:46
  5. do something at:2012-08-04 08:53:51