1. 程式人生 > >Java中的執行緒池(1)----執行緒池基礎知識和CachedThreadPool

Java中的執行緒池(1)----執行緒池基礎知識和CachedThreadPool

本文探討一下java中的執行緒池

首先,什麼是執行緒池?

執行緒池通過多個任務重用執行緒,執行緒建立的開銷就被分攤到了多個任務上,而且請求到達時執行緒已經存在,消除了等待執行緒建立帶來的延遲,使得程式響應更快。

執行緒池應該至少包括

1、執行緒池管理器:建立、銷燬管理執行緒池,將工作執行緒放入執行緒池中。

2、工作執行緒:迴圈執行任務的執行緒,在沒有任務時進行等待。

3、任務佇列:緩衝機制,將沒有處理的任務放入任務佇列中。

4、任務介面:規定任務的入口、任務執行完的收尾工作、任務執行狀態等,任務排程演算法應該寫在這裡。

執行緒池的優點是什麼?

執行緒池主要用來解決執行緒生命週期開銷問題和資源不足問題。

設想每當一個請求到達就建立一個新執行緒,開銷是挺大的,甚至在建立和銷燬執行緒上花的時間和消耗的資源要大於處理使用者請求的時間和資源。另外如果建立執行緒太多,可能導致系統由於過度消耗記憶體和切換過度導致系統資源不足,因而可以通過執行緒池儘可能減少建立和銷燬執行緒的次數,利用已有的物件進行服務。

執行緒池存在的問題是什麼?

執行緒池的使用也是存在風險的,比如一樣存在和其他多執行緒程式存在的併發風險,如同步錯誤、死鎖,還有執行緒池特有的風險,如資源不足、執行緒洩露。

執行緒洩露

其他的問題我的其他文章都說過了,就只說一下執行緒洩露。

當執行緒池中除去一個執行緒去執行一項任務的時候,任務完成之後執行緒卻沒有返回執行緒池,就會發生執行緒洩露。比如在執行任務時丟擲RuntimeException或者一個Error,如果沒有catch到,執行緒池執行緒數量永久減1。當這樣的情況發生次數足夠多時,執行緒池就沒有執行緒來處理任務了。

Java中ThreadPoolExecutor類

JDK中提供了java.util.concurrent.ThreadPoolExecutor類,以此來實現執行緒池:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
這個類提供了4個構造方法,實際上前3個構造方法都是呼叫第4個構造方法進行初始化。

介紹一下構造方法中的每個引數:

1、corePoolSize:核心池的大小。預設情況下建立執行緒池之後執行緒池沒有執行緒,而是等待任務到來才建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,進行執行緒預建立,建立corePoolSize個執行緒。

2、maximumPoolSize:執行緒池最大執行緒個數,表示執行緒池中最多能建立的執行緒個數。

3、keepAliveTime:執行緒多久沒有任務執行就會終止預設情況下,當執行緒數目大於corePoolSize的時候,keepAliveTime才會起作用,直到執行緒池中執行緒數目不大於corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)之後,執行緒池中的執行緒個數不大於corePoolSize的時候,keepAliveTime引數也會起作用,直到執行緒個數為0。

4、unit:引數keepAliveTime引數的時間單位,7種靜態屬性取值:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒
5、workQueue:一個阻塞佇列,用來儲存等待執行的任務,一般阻塞佇列有以下幾種選擇:

ArrayBlockingQueue

LinkedBlockingQueue

SynchronousQueue

執行緒池的排隊策略和阻塞佇列的有關。

6、threadFactory:執行緒工廠,用來建立執行緒的工廠類

7、handler:表示拒絕處理任務時的策略,一般有四種取值:

ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務

ThreadPoolExecutor類中非常重要的方法:

1、execute():向執行緒池提交任務,由執行緒池去執行

2、submit():也是向執行緒池提交任務,但是能返回任務執行結果,實際上還是呼叫execute()方法,利用了Future Pattern。

3、shutdown():將執行緒池狀態置為SHUTDOWN狀態,之後不能往執行緒池新增任何任務,否則都會丟擲RejectedExecutionException異常。但是執行緒池不會立刻退出,而是直到執行緒池中所有任務都處理完,才退出。

4、shutdownNow():將執行緒池狀態立刻變成STOP狀態,試圖停止所有正在執行的執行緒,不在處理還在佇列中等待的任務,然後返回未執行的任務。

對於ThreadPoolExecutor類先了解到這,這個類的第四個建構函式的構造引數多達7個,但是幸運的是jdk提供了4個構造不同種類執行緒池的靜態方法供我們選擇。

1、CachedThreadPool

首先看一下這個執行緒池對應的構造方法:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
也就是當我們執行newCachedThreadPool()靜態方法的時候實際上是執行了ThreadPoolExecutor()建構函式,而引數含義代表:

核心池大小為0

執行緒池最大執行緒數目為最大整型

當執行緒池中的執行緒60s沒有執行任務就終止

阻塞佇列為SynchronousQueue,SynchronousQueue是個有什麼特點的阻塞佇列?

a、每個put操作必須等待一個take操作,反之亦然(相當於管道)

b、不允許null元素

c、是執行緒安全的,阻塞的

d、iterator()永遠為空,peek()永遠返回null,isEmpty()永遠是true,remove()/removeAll()永遠是false。

e、內部佇列沒有任何內部容量

SynchronousQueue

我們對SynchronousQueue做一個小測試:

SynchronousQueue<Integer> sq = new SynchronousQueue<Integer>();
sq.put(1);
sq.take();
這種情況下程式會一直阻塞下去,因為SynchronousQueue的put操作必須等待其take操作。

我們需要以兩個不同執行緒的形式來通過SynchronousQueue傳輸資料:

建立一個TakeThread類:

package newCachedThreadPool;

import java.util.concurrent.SynchronousQueue;

public class TakeThread extends Thread {
    private SynchronousQueue<Integer> sq;
    @SuppressWarnings("unused")
    private String name;
    public TakeThread(SynchronousQueue<Integer> sq,String name){
        super(name);
        this.sq = sq;
    }
    @Override
    public void run(){
        while(true){
            try {
                Integer i = sq.take();
                System.out.println(Thread.currentThread().getName()+"取出"+i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
一個PutThread類:
package newCachedThreadPool;

import java.util.concurrent.SynchronousQueue;

public class PutThread extends Thread {
    private SynchronousQueue<Integer> sq;
    @SuppressWarnings("unused")
    private String name;
    public PutThread(SynchronousQueue<Integer> sq,String name){
        super(name);
        this.sq = sq;
    }
    @Override
    public void run(){
        Integer i = 0;
        while(true){
            try {
                sq.put(i);
                System.out.println(Thread.currentThread().getName()+"放入"+i++);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
最後在測試類中進行測試:
package newCachedThreadPool;

public class Test{
    public static void main(String[] args){
        SynchronousQueue<Integer> sq = new SynchronousQueue<Integer>();
        new PutThread(sq,"puter").start();
        new TakeThread(sq,"taker").start();
    }
}
執行結果如下:

所以說SynchronousQueue的take操作需要put操作等待,put操作需要take操作等待,否則會阻塞,執行緒進入wait set。

這就意味著:

1、這是一個可以無限擴大的執行緒池;

2、適合處理執行時間比較小的任務;

3、執行緒空閒時間超過60s就會被殺死,所以長時間處於空閒狀態的時候,這種執行緒池幾乎不佔用資源;

4、阻塞佇列沒有儲存空間,只要請求到來,就必須找到一條空閒執行緒去處理這個請求,找不到則線上程池新開闢一條執行緒。

下面測試一下這種執行緒池:

public class Test {
    public static void main(String[] args) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;// java中可笑的閉包 得傳final型別
            cachedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(100);//讓當前執行緒休息一會兒
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+"執行"+index);//列印當前執行緒
                }
            });
        }
        cachedThreadPool.shutdown();
    }
}

在這個測試當中,讓CachedThreadPool中的執行緒執行完就休息100ms,這樣使得另一個請求來的時候便建立一個新的執行緒,於是執行結果是這樣的:

每當一個新的請求來的時候,由於之前的執行緒不是空閒狀態,而且這種執行緒池的阻塞佇列不能儲存,所以需要開闢新的執行緒來處理這個請求,因此就出現了這樣的情況,對於10個請求執行緒池開闢了10個執行緒來進行處理。

如果我們將程式碼改一改:

public class Test {
    public static void main(String[] args) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;// java中可笑的閉包 得傳final型別
            try {
                Thread.sleep(100);//在請求處理前給之前處理請求的執行緒足夠的釋放時間 使得之前執行緒能夠繼續處理接下來的請求
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+"執行"+index);
                }
            });
        }
    }
}
這種情況執行結果如下:

由於每次新的請求加入時,執行緒池中1號執行緒都是空閒狀態,所以執行緒池就不需要開闢其他的執行緒來處理新的請求,所以就一直是1號執行緒在處理請求。