Java中的執行緒池(1)----執行緒池基礎知識和CachedThreadPool
本文探討一下java中的執行緒池
首先,什麼是執行緒池?
執行緒池通過多個任務重用執行緒,執行緒建立的開銷就被分攤到了多個任務上,而且請求到達時執行緒已經存在,消除了等待執行緒建立帶來的延遲,使得程式響應更快。
執行緒池應該至少包括
1、執行緒池管理器:建立、銷燬管理執行緒池,將工作執行緒放入執行緒池中。
2、工作執行緒:迴圈執行任務的執行緒,在沒有任務時進行等待。
3、任務佇列:緩衝機制,將沒有處理的任務放入任務佇列中。
4、任務介面:規定任務的入口、任務執行完的收尾工作、任務執行狀態等,任務排程演算法應該寫在這裡。
執行緒池的優點是什麼?
執行緒池主要用來解決執行緒生命週期開銷問題和資源不足問題。
執行緒池存在的問題是什麼?
執行緒池的使用也是存在風險的,比如一樣存在和其他多執行緒程式存在的併發風險,如同步錯誤、死鎖,還有執行緒池特有的風險,如資源不足、執行緒洩露。
執行緒洩露
其他的問題我的其他文章都說過了,就只說一下執行緒洩露。
當執行緒池中除去一個執行緒去執行一項任務的時候,任務完成之後執行緒卻沒有返回執行緒池,就會發生執行緒洩露。比如在執行任務時丟擲RuntimeException或者一個Error,如果沒有catch到,執行緒池執行緒數量永久減1。當這樣的情況發生次數足夠多時,執行緒池就沒有執行緒來處理任務了。
Java中ThreadPoolExecutor類
JDK中提供了java.util.concurrent.ThreadPoolExecutor類,以此來實現執行緒池:
這個類提供了4個構造方法,實際上前3個構造方法都是呼叫第4個構造方法進行初始化。public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
介紹一下構造方法中的每個引數:
1、corePoolSize:核心池的大小。預設情況下建立執行緒池之後執行緒池沒有執行緒,而是等待任務到來才建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,進行執行緒預建立,建立corePoolSize個執行緒。
2、maximumPoolSize:執行緒池最大執行緒個數,表示執行緒池中最多能建立的執行緒個數。
3、keepAliveTime:執行緒多久沒有任務執行就會終止。預設情況下,當執行緒數目大於corePoolSize的時候,keepAliveTime才會起作用,直到執行緒池中執行緒數目不大於corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)之後,執行緒池中的執行緒個數不大於corePoolSize的時候,keepAliveTime引數也會起作用,直到執行緒個數為0。
4、unit:引數keepAliveTime引數的時間單位,7種靜態屬性取值:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
5、workQueue:一個阻塞佇列,用來儲存等待執行的任務,一般阻塞佇列有以下幾種選擇:
ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue
執行緒池的排隊策略和阻塞佇列的有關。
6、threadFactory:執行緒工廠,用來建立執行緒的工廠類
7、handler:表示拒絕處理任務時的策略,一般有四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
ThreadPoolExecutor類中非常重要的方法:
1、execute():向執行緒池提交任務,由執行緒池去執行
2、submit():也是向執行緒池提交任務,但是能返回任務執行結果,實際上還是呼叫execute()方法,利用了Future Pattern。
3、shutdown():將執行緒池狀態置為SHUTDOWN狀態,之後不能往執行緒池新增任何任務,否則都會丟擲RejectedExecutionException異常。但是執行緒池不會立刻退出,而是直到執行緒池中所有任務都處理完,才退出。
4、shutdownNow():將執行緒池狀態立刻變成STOP狀態,試圖停止所有正在執行的執行緒,不在處理還在佇列中等待的任務,然後返回未執行的任務。
對於ThreadPoolExecutor類先了解到這,這個類的第四個建構函式的構造引數多達7個,但是幸運的是jdk提供了4個構造不同種類執行緒池的靜態方法供我們選擇。
1、CachedThreadPool
首先看一下這個執行緒池對應的構造方法:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
也就是當我們執行newCachedThreadPool()靜態方法的時候實際上是執行了ThreadPoolExecutor()建構函式,而引數含義代表:
核心池大小為0
執行緒池最大執行緒數目為最大整型
當執行緒池中的執行緒60s沒有執行任務就終止
阻塞佇列為SynchronousQueue,SynchronousQueue是個有什麼特點的阻塞佇列?
a、每個put操作必須等待一個take操作,反之亦然(相當於管道)
b、不允許null元素
c、是執行緒安全的,阻塞的
d、iterator()永遠為空,peek()永遠返回null,isEmpty()永遠是true,remove()/removeAll()永遠是false。
e、內部佇列沒有任何內部容量
SynchronousQueue
我們對SynchronousQueue做一個小測試:
SynchronousQueue<Integer> sq = new SynchronousQueue<Integer>();
sq.put(1);
sq.take();
這種情況下程式會一直阻塞下去,因為SynchronousQueue的put操作必須等待其take操作。
我們需要以兩個不同執行緒的形式來通過SynchronousQueue傳輸資料:
建立一個TakeThread類:
package newCachedThreadPool;
import java.util.concurrent.SynchronousQueue;
public class TakeThread extends Thread {
private SynchronousQueue<Integer> sq;
@SuppressWarnings("unused")
private String name;
public TakeThread(SynchronousQueue<Integer> sq,String name){
super(name);
this.sq = sq;
}
@Override
public void run(){
while(true){
try {
Integer i = sq.take();
System.out.println(Thread.currentThread().getName()+"取出"+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
一個PutThread類:
package newCachedThreadPool;
import java.util.concurrent.SynchronousQueue;
public class PutThread extends Thread {
private SynchronousQueue<Integer> sq;
@SuppressWarnings("unused")
private String name;
public PutThread(SynchronousQueue<Integer> sq,String name){
super(name);
this.sq = sq;
}
@Override
public void run(){
Integer i = 0;
while(true){
try {
sq.put(i);
System.out.println(Thread.currentThread().getName()+"放入"+i++);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
最後在測試類中進行測試:
package newCachedThreadPool;
public class Test{
public static void main(String[] args){
SynchronousQueue<Integer> sq = new SynchronousQueue<Integer>();
new PutThread(sq,"puter").start();
new TakeThread(sq,"taker").start();
}
}
執行結果如下:所以說SynchronousQueue的take操作需要put操作等待,put操作需要take操作等待,否則會阻塞,執行緒進入wait set。
這就意味著:
1、這是一個可以無限擴大的執行緒池;
2、適合處理執行時間比較小的任務;
3、執行緒空閒時間超過60s就會被殺死,所以長時間處於空閒狀態的時候,這種執行緒池幾乎不佔用資源;
4、阻塞佇列沒有儲存空間,只要請求到來,就必須找到一條空閒執行緒去處理這個請求,找不到則線上程池新開闢一條執行緒。
下面測試一下這種執行緒池:
public class Test {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;// java中可笑的閉包 得傳final型別
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);//讓當前執行緒休息一會兒
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"執行"+index);//列印當前執行緒
}
});
}
cachedThreadPool.shutdown();
}
}
在這個測試當中,讓CachedThreadPool中的執行緒執行完就休息100ms,這樣使得另一個請求來的時候便建立一個新的執行緒,於是執行結果是這樣的:
每當一個新的請求來的時候,由於之前的執行緒不是空閒狀態,而且這種執行緒池的阻塞佇列不能儲存,所以需要開闢新的執行緒來處理這個請求,因此就出現了這樣的情況,對於10個請求執行緒池開闢了10個執行緒來進行處理。
如果我們將程式碼改一改:
public class Test {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;// java中可笑的閉包 得傳final型別
try {
Thread.sleep(100);//在請求處理前給之前處理請求的執行緒足夠的釋放時間 使得之前執行緒能夠繼續處理接下來的請求
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"執行"+index);
}
});
}
}
}
這種情況執行結果如下:
由於每次新的請求加入時,執行緒池中1號執行緒都是空閒狀態,所以執行緒池就不需要開闢其他的執行緒來處理新的請求,所以就一直是1號執行緒在處理請求。