Java執行緒池原始碼解析及高質量程式碼案例
引言
本文為Java高階程式設計中的一些知識總結,其中第一章對Jdk 1.7.0_25中的多執行緒架構中的執行緒池ThreadPoolExecutor原始碼進行架構原理介紹以及原始碼解析。第二章則分析了幾個違反Java高質量程式碼案例以及相應解決辦法。如有總結的不好的地方,歡迎大家提出寶貴的意見和建議。
Java執行緒池架構原理及原始碼解析
ThreadPoolExecutor是一個 ExecutorService,它使用可能的幾個池執行緒之一執行每個提交的任務,通常使用 Executors 工廠方法配置。執行緒池可以解決兩個不同問題:由於減少了每個任務呼叫的開銷,它們通常可以在執行大量非同步任務時提供增強的效能,並且還可以提供繫結和管理資源(包括執行任務集時使用的執行緒)的方法。每個 ThreadPoolExecutor 還維護著一些基本的統計資料,如完成的任務數。
構建引數原始碼
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
{
this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
引數解釋
corePoolSize:核心執行緒數,會一直存活,即使沒有任務,執行緒池也會維護執行緒的最少數量。
maximumPoolSize: 執行緒池維護執行緒的最大數量。
keepAliveTime: 執行緒池維護執行緒所允許的空閒時間,當執行緒空閒時間達到keepAliveTime,該執行緒會退出,直到執行緒數量等於corePoolSize。如果allowCoreThreadTimeout設定為
true,則所有執行緒均會退出直到執行緒數量為0。
unit: 執行緒池維護執行緒所允許的空閒時間的單位、可選引數值為:TimeUnit中的幾個靜態屬性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。workQueue:執行緒池所使用的緩衝佇列,常用的是:java.util.concurrent.ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。
handler: 執行緒池中的數量大於maximumPoolSize,對拒絕任務的處理策略,預設值ThreadPoolExecutor.AbortPolicy()。
原始碼詳細解析
excute原始碼
public void execute(Runnable command)
{
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
{
if (runState == RUNNING && workQueue.offer(command))
{
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
一個任務通過 execute(Runnable)方法被新增到執行緒池,任務就是一個Runnable型別的物件,任務的執行方法就是run()方法,如果傳入的為null,側丟擲NullPointerException。
首先第一個判定空操作就不用說了,下面判定的poolSize >= corePoolSize成立時候會進入if的區域,當然它不成立也有可能會進入,他會判定addIfUnderCorePoolSize是否返回false,如果返回false就會進去。
如果當前執行緒數小於corePoolSize,呼叫addIfUnderCorePoolSize方法,addIfUnderCorePoolSize方法首先呼叫mainLock加鎖,再次判斷當前執行緒數小於corePoolSize並且執行緒池處於RUNNING狀態,則呼叫addThread增加執行緒。
圖一:ThreadPoolExecutor執行狀態圖
addIfUnderCorePoolSize原始碼
private boolean addIfUnderCorePoolSize(Runnable firstTask)
{
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
}
finally
{
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
addThread方法首先建立Work物件,然後呼叫threadFactory建立新的執行緒,如果建立的執行緒不為null,將Work物件的 thread屬性設定為此創建出來的執行緒,並將此Work物件放入workers中,然後在增加當前執行緒池的中執行緒數,增加後回到 addIfUnderCorePoolSize方法 ,釋放mainLock,最後啟動這個新建立的執行緒來執行新傳入的任務。
可以發現,這段原始碼是如果發現小於corePoolSize就會建立一個新的執行緒,並且呼叫執行緒的start()方法將執行緒執行起來:這個addThread()方法,我們先不考慮細節,因為我們還要先看到前面是怎麼進去的,這裡可以發信啊,只有沒有建立成功Thread才會返回false,也就是噹噹前的poolSize > corePoolSize的時候,或執行緒池已經不是在running狀態的時候才會出現。
注意:這裡在外部判定一次poolSize和corePoolSize只是初步判定,內部是加鎖後判定的,以得到更為準確的結果,而外部初步判定如果是大於了,就沒有必要進入這段有鎖的程式碼了。
addThread原始碼
private Thread addThread(Runnable firstTask)
{
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
< span style = "color:#ff0000;" > < / span >
if (t != null)
{
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
ThreadFactory介面預設實現DefaultThreadFactory
public Thread newThread(Runnable r)
{
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
這裡建立了一個Work,其餘的操作,就是講poolSize疊加,然後將將其放入workers的執行佇列等操作;
我們主要關心Worker是幹什麼的,因為這個threadFactory對我們用途不大,只是做了Thread的命名處理;而Worker你會發現它的定義也是一個Runnable,外部開始在程式碼段中發現了呼叫哪個這個Worker的start()方法,也就是執行緒的啟動方法,其實也就是呼叫了Worker的run()方法,那麼我們重點要關心run方法是如何處理的。
Worker的run方法
public void run()
{
try
{
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null)
{
runTask(task);
task = null;
}
}
finally
{
workerDone(this);
}
}
從以上方法可以看出,Worker所在的執行緒啟動後,首先執行建立其時傳入的Runnable任務,執行完成後,迴圈呼叫getTask來獲取新的任務,在沒有任務的情況下,退出此執行緒。FirstTask其實就是開始在建立work的時候,由外部傳入的Runnable物件,也就是你自己的Thread,你會發現它如果發現task為空,就會呼叫getTask()方法再判定,直到兩者為空,並且是一個while迴圈體。
getTask原始碼
Runnable getTask()
{
for (;;)
{
try
{
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit())
{
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
}
catch (InterruptedException ie)
{
// On interruption, re-check runState
}
}
}
你會發現它是從workQueue佇列中,也就是等待佇列中獲取一個元素出來並返回!當前執行緒執行完後,在到workQueue中去獲取一個task出來,繼續執行,這樣就保證了執行緒池中有一定的執行緒一直在執行;此時若跳出了while循 環,只有workQueue佇列為空才會出現或出現了類似於shutdown的操作,自然執行佇列會減少1,當再有新的執行緒進來的時候,就又開始向 worker裡面放資料了,這樣以此類推,實現了執行緒池的功能。
execute方法部分實現
if (runState == RUNNING && workQueue.offer(command))
{
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
如果當前執行緒池數量大於corePoolSize或addIfUnderCorePoolSize方法執行失敗,則執行後續操作;如果執行緒池處於執行狀態 並且workQueue中成功加入任務,再次判斷如果執行緒池的狀態不為執行狀態或當前執行緒池數為0,則呼叫 ensureQueuedTaskHandled方法。
ensureQueuedTaskHandled原始碼
private void ensureQueuedTaskHandled(Runnable command)
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean reject = false;
Thread t = null;
try
{
int state = runState;
if (state != RUNNING && workQueue.remove(command))
reject = true;
else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) &&
!workQueue.isEmpty())
t = addThread(null);
}
finally
{
mainLock.unlock();
}
if (reject)
reject(command);
else if (t != null)
t.start();
}
第一個if,也就是噹噹前狀態為running的時候,就會去執行workQueue.offer(command),這個workQueue其實就是一 個BlockingQueue,offer()操作就是在佇列的尾部寫入一個物件,此時寫入的物件為執行緒的物件而已;所以你可以認為只有執行緒池在 RUNNING狀態,才會在佇列尾部插入資料,否則就執行else if,其實else if可以看出是要做一個是否大於MaximumPoolSize的判定,如果大於這個值,就會做reject的操作。ensureQueuedTaskHandled方法判斷執行緒池執行,如果狀態不為執行狀態,從workQueue中刪除,並呼叫reject做拒絕處理。
reject原始碼
void reject(Runnable command)
{
handler.rejectedExecution(command, this);
}
再次回到execute方法
if (runState == RUNNING && workQueue.offer(command))
{
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
如執行緒池workQueue offer失敗或不處於執行狀態,呼叫addIfUnderMaximumPoolSize, addIfUnderMaximumPoolSize方法基本和addIfUnderCorePoolSize實現類似,不同點在於根據最大執行緒數(maximumPoolSize)進行比較,如果超過最大執行緒數,返回false,呼叫reject方法。
addIfUnderMaximumPoolSize原始碼
private boolean addIfUnderMaximumPoolSize(Runnable firstTask)
{
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
}
finally
{
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
也就是如果執行緒池滿了,而且執行緒池呼叫了shutdown後,還在呼叫execute方法時,就會丟擲上面說明的異常:RejectedExecutionException。
workerDone原始碼
void workerDone(Worker w)
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
completedTaskCount += w.completedTasks;
workers.remove(w);
if (--poolSize == 0)
tryTerminate();
}
finally
{
mainLock.unlock();
}
}
注意這裡將workers.remove(w)掉,並且呼叫了—poolSize
來做操作。至於tryTerminate是做了更多關於回收方面的操作。
runTask(task)原始碼
private void runTask(Runnable task)
{
final ReentrantLock runLock = this.runLock;
runLock.lock();
try
{
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
boolean ran = false;
beforeExecute(thread, task);
try
{
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
}
catch (RuntimeException ex)
{
if (!ran)
afterExecute(task, ex);
throw ex;
}
}
finally
{
runLock.unlock();
}
}
你可以看到,這裡面的task為傳入的task資訊,呼叫的不是start方法,而是run方法,因為run方法直接呼叫不會啟動新的執行緒,也是因為這樣,導致了你無法獲取到你自己的執行緒的狀態,因為執行緒池是直接呼叫的run方法,而不是start方法來執行。
這裡有個beforeExecute和afterExecute方法,分別代表在執行前和執行後,你可以做一段操作,在這個類中,這兩個方法都是空的,因為普通執行緒池無需做更多的操作。
如果你要實現類似暫停等待通知的或其他的操作,可以自己extends後進行重寫構造。
新增任務處理流程
AbortPolicy()
public static class AbortPolicy implements RejectedExecutionHandler
{
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/*當執行緒池中的數量等於最大執行緒數時,直接丟擲丟擲java.util.concurrent.RejectedExecutionException異常。*/
CallerRunsPolicy()
public static class CallerRunsPolicy implements RejectedExecutionHandler
{
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
if (!e.isShutdown())
{
r.run();
}
}
}
當執行緒池中的數量等於最大執行緒數時、重試執行當前的任務,交由呼叫者執行緒來執行任務。
DiscardOldestPolicy()
public static class DiscardOldestPolicy implements RejectedExecutionHandler
{
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
if (!e.isShutdown())
{
e.getQueue().poll();
e.execute(r);
}
}
}
當執行緒池中的數量等於最大執行緒數時、拋棄執行緒池中最後一個要執行的任務,並執行新傳入的任務。
DiscardPolicy()
public static class DiscardPolicy implements RejectedExecutionHandler
{
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
}
}
當執行緒池中的數量等於最大執行緒數時,不做任何動作。
通常你得到執行緒池後,會呼叫其中的:submit方法或execute方法去操作;其實你會發現,submit方法最終會呼叫execute方法來進行操 作,只是他提供了一個Future來託管返回值的處理而已,當你呼叫需要有返回值的資訊時,你用它來處理是比較好的;這個Future會包裝對 Callable資訊,並定義一個Sync物件,當你發生讀取返回值的操作的時候,會通過Sync物件進入鎖,直到有返回值的資料通知。
違反Java高質量程式碼案例
非同步運算使用Callable介面
Callable介面程式碼如下:
public interface Callable<V>{
v call() throws Exception;
}
實現Callable介面,只是表明它是一個可呼叫的任務,並不表示它具有多執行緒運算的能力,還是要執行器來執行。程式碼如下:
class TaxCalculator implements Callable<Integer>{
private int seedMoney;
public TaxCalculator(int _seedMoney){
seedMoney=_seedMoney;
}
@Override
public Integer call() throws Exception {
TimeUnit.MILLISECONDS.sleep(10000);
return seedMoney/10;
}
}
這裡模擬稅款計算器運算,可能花費10秒鐘時間。使用者輸入即有輸出,若耗時較長,則顯示運算進度。如果我們直接計算,就只有一個main執行緒,是不可能友好提示的,如果稅金不計算完畢,也不會執行後續動作,所以最好的辦法就是重啟一個執行緒來運算,讓main執行緒做進度提示
public static void main(String[] args) throws Exception{
ExecutorService es=Executors.newSingleThreadExecutor();
Future<Integer> future=es.submit(new TaxCalculator(100));
while(!future.isDone()){
TimeUnit.MILLISECONDS.sleep(200);
System.out.println("#");
}
System.out.println("\n 計算完成,稅金是:"+future.get()+"元");
es.shutdown();
}
Executors是一個靜態工具類,提供了非同步執行器的建立能力,如單執行緒執行newSingleThreadExcutor、固定執行緒數量的執行器newFixedThreadPool等,一般是非同步計算的入口類。
優先選擇執行緒池
執行緒的狀態只能由新建狀態轉變為執行態後才可能被阻塞或等待,最後終結,不可能產生本末倒置的情況,程式碼如下:
public static void main(String[] args) throws Exception{
Thread t=new Thread(new Runnable() {
@Override
public void run() {
System.out.println("執行緒在執行");
}
});
t.start();
while(!t.getState().equals(Thread.State.TERMINATED)){
TimeUnit.MILLISECONDS.sleep(10);
}
t.start();
}
此時程式執行會報IllegalThreadStateException異常,原因就是不能從結束狀態直接轉換為可執行狀態。這時可以引入執行緒池,當系統需要時直接從執行緒池中獲得執行緒,運算出結果,再把執行緒返回到執行緒池中,程式碼如下:
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(2);
for (int i = 0; i < 4; i++) {
es.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
es.shutdown();
}
執行緒死鎖
Java是單執行緒語言,一旦執行緒死鎖,只能藉助外部程序重啟應用才能解決。
static class A {
public synchronized void a1(B b) {
String name = Thread.currentThread().getName();
System.out.println(name + "進入A.a1()");
try {
Thread.sleep(1000);
} catch (Exception e) {
// TODO: handle exception
}
System.out.println(name + "試圖訪問B.b2()");
b.b2();
}
public synchronized void a2() {
System.out.println("進入 a.a2()");
}
}
static class B {
public synchronized void b1(A a) {
String name = Thread.currentThread().getName();
System.out.println(name + "進入B.b1()");
try {
Thread.sleep(1000);
} catch (Exception e) {
// TODO: handle exception
}
System.out.println(name + "試圖訪問A.a2()");
a.a2();
}
public synchronized void b2() {
System.out.println("進入 B.b2()");
}
}
public static void main(String[] args) {
final A a = new A();
final B b = new B();
new Thread(new Runnable() {
@Override
public void run() {
a.a1(b);
}
}, "執行緒A").start();
;
new Thread(new Runnable() {
@Override
public void run() {
b.b1(a);
}
}, "執行緒B").start();
;
}
此段程式定義了兩個資源A和B,然後在兩個執行緒A、B中使用了該資源,由於兩個資源之間有互動操作,並且都是同步方法,因此線上程A休眠1秒鐘後,它會試圖訪問資源B的b2方法,但是執行緒B持有該類的鎖,並同時在等待A執行緒釋放其鎖資源,所以此時就出現了兩個執行緒在互相等待釋放資源的情況,也就是死鎖。可以使用自旋鎖改進,程式碼如下:
public void b2()
{
try
{
if(Lock.trylock(2, TimeUnit.SECONDS))
{
System.out.println("進入 B.b2()");
}
}
catch (InterruptedException e)
{
// TODO: handle exception
}
finally
{
Lock.unlock();
}
}
它原理和互斥鎖一樣,如果一個執行單元要想訪問被自旋鎖保護的共享資源,則必須先得到鎖,在訪問完共享資源後,也必須釋放鎖。
忽略設定阻塞佇列長度
BlockingQueue是一種集合,實現了Collection介面,容量是不可以自行管理的,程式碼如下:
public static void main(String[] args) throws Exception {
BlockingDeque<String> bq = (BlockingDeque<String>) new ArrayBlockingQueue<String>(
5);
for (int i = 0; i < 10; i++) {
bq.add("");
}
}
阻塞佇列容量是固定的,非阻塞佇列則是變長的。阻塞佇列可以在宣告是指定佇列的容量,若指定的容量,則元素的數量不可超過該容量,若不指定,佇列的容量為Integer的最大值
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements
BlockingDeque<E>, java.io.Serializable
{
public final E[] items;
private int count;
public boolean add(E e)
{
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e)
{
final ReentrantLock lock = this.lock;
lock.lock();
try
{
if (count == items.length)
;
else
{
insert(e);
return true;
}
}
finally
{
lock.unlock();
}
}
}
上面在加入元素時,如果判斷當前佇列已滿,則返回false,表示插入失敗,之後再包裝成佇列滿異常。
使用stop方法停止執行緒
stop方法會破壞原子邏輯,程式碼如下:
class MutiThread implements Runnable {
int a = 0;
@Override
public void run() {
// TODO Auto-generated method stub
synchronized ("") {
a++;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
a--;
String tn = Thread.currentThread().getName();
System.out.println(tn + ":a=" + a);
}
}
public static void main(String[] args) {
MutiThread t = new MutiThread();
Thread t1 = new Thread(t);
t1.start();
for (int i = 0; i < 5; i++) {
new Thread(t).start();
}
t1.stop();
}
}
所有執行緒共享了一個MutilThread的例項變數t,由於在run方法中加入了同步程式碼塊,所以只能有一個執行緒進入到synchronized塊中,可以自定義標誌位來決定執行緒執行情況,程式碼如下:
class SafeStopThread extends Thread{
private volatile boolean stop=false;
@Override
public void run()
{//判斷執行緒體是否執行
while(stop)
{}
}
//執行緒終止
public void terminate(){
stop=true;
}
}
線上程體中判斷是否需要停止執行,即可保證執行緒體的邏輯完整性,而且也不會破壞原子邏輯。
覆寫start方法
程式碼:
class MutiThread implements Thread
{
@Override
public void start()
{
//呼叫執行緒體
run();
}
}
@Override
public void run()
{
}
}
public static void main(String[] args)
{
MutiThread t = new MutiThread();
t.start();
}
}
main方法根本就沒有啟動一個子執行緒,整個應用程式中只有一個主執行緒在執行,並不會建立其他的執行緒。改進後代碼如下:
class MutiThread implements Thread
{
@Override
public void start()
{
/*執行緒啟動前的業務處理*/
super.start();
/*執行緒啟動後的業務處理*/
}
}
@Override
public void run()
{
}
}
start方法呼叫父類的start方法,沒有主動呼叫run方法,由JVM自行呼叫,不用我們的顯式實現。
使用過多執行緒優先順序
Java執行緒有10個基本,級別為0代表JVM
程式碼如下:
class MutiThread implements Runnable {
public void start(int _priority) {
Thread t = new Thread(this);
t.setPriority(_priority);
t.start();
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
Math.hypot(Math.pow(924526789, i), Math.cos(i));
}
System.out.println("Priority:"+Thread.currentThread().getPriority());
}
public static void main(String[] args) {
for(int i=0;i<20;i++)
{
new MutiThread().start(i%10+1);
}
}
}
Java優先順序只是代表搶佔CPU機會大小,優先順序越高,搶佔CPU機會越大,被優先執行的可能性越高,優先順序相差不大,則搶佔CPU機會差別也不大。導致優先順序為9的執行緒比優先順序為10的執行緒先執行。於是在Thread類中設定三個優先順序,建議使用優先順序常量,而不是1到10的隨機數字,程式碼如下:
public final static int MIN_PRIORITY = 1;
/**
* The default priority that is assigned to a thread.
*/
public final static int NORM_PRIORITY = 5;
/**
* The maximum priority that a thread can have.
*/
public final static int MAX_PRIORITY = 10;
/**
* Returns a reference to the currently executing thread object.
*
* @return the currently executing thread.
*/
}
Lock與synchronized
Lock為顯式鎖,synchronized為內部鎖,程式碼如下:
class Task
{
public void dosomething(){
try {
Thread.sleep(2000);
} catch (Exception e) {
// TODO: handle exception
}
StringBuffer sb=new StringBuffer();
sb.append("執行緒名:"+Thread.currentThread().getName());
sb.append(",執行緒時間:"+Calendar.getInstance().get(13)+"s");
System.out.println(sb);
}
}
//顯示鎖任務
class TaskWithLock extends Task implements Runnable{
private final Lock lock=new ReentrantLock();
@Override
public void run() {
try {
lock.lock();
dosomething();
} finally
{
lock.unlock();
}
}};
//內部鎖任務
class TaskWithSync extends Task implements Runnable{
@Override
public void run() {
synchronized ("A") {
dosomething();
}
}};
對於同步資源來說,顯式鎖時物件級別的鎖,而內部鎖時類級別的鎖,也就是說lock鎖時跟隨物件的,synchronized鎖時跟隨類
改進方法:把Lock定義為所有執行緒的共享變數。
public static void main(String[] args) {
//多個執行緒共享鎖
final Lock lock=new ReentrantLock();
……
}
執行緒池異常處理
Java中執行緒執行的任務介面java.lang.Runnable 要求不丟擲Checked異常,
public interface Runnable {
public abstract void run();
}
那麼如果 run() 方法中丟擲了RuntimeException,將會怎麼處理了?
通常java.lang.Thread物件執行設定一個預設的異常處理方法:
java.lang.Thread.setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)
而這個預設的靜態全域性的異常捕獲方法時輸出堆疊。當然,我們可以覆蓋此預設實現,只需要一個自定義的java.lang.Thread.UncaughtExceptionHandler
介面實現即可。
public interface UncaughtExceptionHandler {
void uncaughtException(Thread t, Throwable e);
}
而線上程池中卻比較特殊。預設情況下,執行緒池 java.util.concurrent.ThreadPoolExecutor 會Catch住所有異常, 當任務執行完成(java.util.concurrent.ExecutorService.submit(Callable))獲取其結果 時(java.util.concurrent.Future.get())會丟擲此RuntimeException。
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
*/
V get() throws InterruptedException, ExecutionException;
其中 ExecutionException
異常即是java.lang.Runnable
或者 java.util.concurrent.Callable
丟擲的異常。
也就是說,執行緒池在執行任務時捕獲了所有異常,並將此異常加入結果中。這樣一來執行緒池中的所有執行緒都將無法捕獲到丟擲的異常。 從而無法通過設定執行緒的預設捕獲方法攔截的錯誤異常。也不同通過 自定義執行緒來完成異常的攔截。好在java.util.concurrent.ThreadPoolExecutor 預留了一個方法,執行在任務執行完畢進行擴充套件(當然也預留一個protected方法beforeExecute(Thread t, Runnable r)):
protected void afterExecute(Runnable r, Throwable t) { }
此方法的預設實現為空,這樣我們就可以通過繼承或者覆蓋ThreadPoolExecutor 來達到自定義的錯誤處理。
解決辦法如下:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(11, 100, 1, TimeUnit.MINUTES, //
new ArrayBlockingQueue<Runnable>(10000),//
new DefaultThreadFactory()) {
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
printException(r, t);
}
};
private static void printException(Runnable r, Throwable t) {
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
if (future.isDone())
future.get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
log.error(t.getMessage(), t);
}
使用SimpleThread類
TestThreadPool類是一個測試程式,用來模擬客戶端的請求,當你執行它時,系統首先會顯示執行緒池的初始化資訊,然後提示你從鍵盤上輸入字串,並按下回車鍵,這時你會發現螢幕上顯示資訊,告訴你某個執行緒正在處理你的請求,如果你快速地輸入一行行字串,那麼你會發現執行緒池中不斷有執行緒被喚醒,來處理你的請求,在本例中,我建立了一個擁有10個執行緒的執行緒池,如果執行緒池中沒有可用執行緒了,系統會提示你相應的警告資訊,但如果你稍等片刻,那你會發現螢幕上會陸陸續續提示有執行緒進入了睡眠狀態,這時你又可以傳送新的請求了。
程式碼如下:
//TestThreadPool.java
import java.io.*;
public class TestThreadPool
{
public static void main(String[] args)
{
try
{
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
String s;
ThreadPoolManager manager = new ThreadPoolManager(10);
while((s = br.readLine()) != null)
{
manager.process(s);
}
}
catch(IOException e) {}
}
}
ThreadPoolManager類,顧名思義,它是一個用於管理執行緒池的類,它的主要職責是初始化執行緒池,併為客戶端的請求分配不同的執行緒來進行處理,如果執行緒池滿了,它會對你發出警告資訊。
程式碼如下:
import java.util.*;
class ThreadPoolManager
{
private int maxThread;
public Vector vector;
public void setMaxThread(int threadCount)
{
maxThread = threadCount;
}
public ThreadPoolManager(int threadCount)
{
setMaxThread(threadCount);
System.out.println("Starting thread pool...");
vector = new Vector();
for(int i = 1; i <= 10; i++)
{
SimpleThread thread = new SimpleThread(i);
vector.addElement(thread);
thread.start();
}
}
public void process(String argument)
{
int i;
for(i = 0; i < vector.size(); i++)
{
SimpleThread currentThread = (SimpleThread)vector.elementAt(i);
if(!currentThread.isRunning())
{
System.out.println("Thread " + (i + 1) + " is processing:" +