java 執行緒池詳解及四種執行緒池用法介紹
java 執行緒池詳解
Executor框架是一種將執行緒的建立和執行分離的機制。它基於Executor和ExecutorService介面,及這兩個介面的實現類ThreadPoolExecutor展開,Executor有一個內部執行緒池,並提供了將任務傳遞到池中執行緒以獲得執行的方法,可傳遞的任務有如下兩種:通過Runnable介面實現的任務和通過Callable介面實現的任務。在這兩種情況下,只需要傳遞任務到執行器,執行器即可使用執行緒池中的執行緒或新建立的執行緒來執行任務。執行器也決定了任務執行時間。
java提供了四種執行緒池的實現:
(1)newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
(2)newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
(3)newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。
(4)newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
下面就對這些執行緒池的使用方式進行簡要的程式碼介紹:
首先是可快取執行緒池:
然後是定長執行緒池:import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolDemo { public static void main (String[] args) { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for(int i = 0; i < 10; i++) { final int index = i; try { Thread.sleep(index*1000); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(new Runnable () { public void run () { System.out.println(index); } }); } } }
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolDemo { public static void main (String[] args) { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for(int i = 0; i < 10; i++) { final int index = i; fixedThreadPool.execute(new Runnable () { public void run () { System.out.println(index); try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } } }
然後是定長執行緒池支援定時和週期性任務:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo {
public static void main (String[] args) {
ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(5);
scheduleThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
}
}
最後是:單執行緒化執行緒池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo {
public static void main (String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for(int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
好了四種執行緒池的使用上面已經介紹完了,現在來看看執行緒池的原理吧,其中最重要的就是ThreadPoolExecutor類的建構函式,你會有疑惑,上面程式壓根沒有出現這個ThreadPoolExecutor類啊,其實,如果你追程式碼到
Executors.newSingleThreadExecutor()
中檢視你會發現如下程式碼: public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
這四類執行緒池類底層都是ThreadPoolExecutor類進行初始化的,你不信你可以一個一個點進去看一下,而且我告訴你,四大執行緒池是通過使用ThreadPoolExecutor建構函式實現的;你看看下面實現就知道了
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);//這邊居然不是直接調ThreadPoolExecutor建構函式,但是我們追一下程式碼看看下面這個函式你就會明白
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());//裡面使用了父類的建構函式,下面就是本類和父類的繼承關係,看看他的父類是什麼,你就明白了
}
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
OK。到這裡,你就應該知道我們今天的主角是誰了---------ThreadPoolExecutor
好了我們可以來看看這個類的建構函式原始碼了:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
看見有好多引數啊,現在主要對其中的構造引數進行解釋:
corePoolSize:核心池大小,意思是當超過這個範圍的時候,就需要將新的執行緒放到等待佇列中了即workQueue;
maximumPoolSize:執行緒池最大執行緒數量,表明執行緒池能建立的最大執行緒數
keepAlivertime:當活躍執行緒數大於核心執行緒數,空閒的多餘執行緒最大存活時間。
unit:存活時間的單位
workQueue:存放任務的佇列---阻塞佇列
handler:超出執行緒範圍(maximumPoolSize)和佇列容量的任務的處理程式
我們執行執行緒時都會呼叫到ThreadPoolExecutor的execute()方法,現在我們來看看這個方法的原始碼(就是下面這段程式碼了,這裡面有一些註釋解析),我直接來解釋一下吧:在這段程式碼中我們至少要看懂一個邏輯:噹噹前執行緒數小於核心池執行緒數時,只需要新增一個執行緒並且啟動它,如果執行緒數數目大於核心執行緒池數目,我們將任務放到workQueue中,如果連workQueue滿了,那麼就要拒絕任務了。詳細的函式我就不介紹了
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}