1. 程式人生 > >Java執行緒池原始碼解析及高質量程式碼案例

Java執行緒池原始碼解析及高質量程式碼案例

引言

本文為Java高階程式設計中的一些知識總結,其中第一章對Jdk 1.7.0_25中的多執行緒架構中的執行緒池ThreadPoolExecutor原始碼進行架構原理介紹以及原始碼解析。第二章則分析了幾個違反Java高質量程式碼案例以及相應解決辦法。如有總結的不好的地方,歡迎大家提出寶貴的意見和建議。

Java執行緒池架構原理及原始碼解析

ThreadPoolExecutor是一個 ExecutorService,它使用可能的幾個池執行緒之一執行每個提交的任務,通常使用 Executors 工廠方法配置。執行緒池可以解決兩個不同問題:由於減少了每個任務呼叫的開銷,它們通常可以在執行大量非同步任務時提供增強的效能,並且還可以提供繫結和管理資源(包括執行任務集時使用的執行緒)的方法。每個 ThreadPoolExecutor 還維護著一些基本的統計資料,如完成的任務數。

構建引數原始碼

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)
{
    this
(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }

引數解釋

  • corePoolSize:核心執行緒數,會一直存活,即使沒有任務,執行緒池也會維護執行緒的最少數量。

  • maximumPoolSize: 執行緒池維護執行緒的最大數量。

  • keepAliveTime: 執行緒池維護執行緒所允許的空閒時間,當執行緒空閒時間達到keepAliveTime,該執行緒會退出,直到執行緒數量等於corePoolSize。如果allowCoreThreadTimeout設定為
    true,則所有執行緒均會退出直到執行緒數量為0。
    unit: 執行緒池維護執行緒所允許的空閒時間的單位、可選引數值為:TimeUnit中的幾個靜態屬性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

  • workQueue:執行緒池所使用的緩衝佇列,常用的是:java.util.concurrent.ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。

  • handler: 執行緒池中的數量大於maximumPoolSize,對拒絕任務的處理策略,預設值ThreadPoolExecutor.AbortPolicy()。

原始碼詳細解析

excute原始碼

public void execute(Runnable command)
{
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
    {
        if (runState == RUNNING && workQueue.offer(command))
        {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

一個任務通過 execute(Runnable)方法被新增到執行緒池,任務就是一個Runnable型別的物件,任務的執行方法就是run()方法,如果傳入的為null,側丟擲NullPointerException。
首先第一個判定空操作就不用說了,下面判定的poolSize >= corePoolSize成立時候會進入if的區域,當然它不成立也有可能會進入,他會判定addIfUnderCorePoolSize是否返回false,如果返回false就會進去。
如果當前執行緒數小於corePoolSize,呼叫addIfUnderCorePoolSize方法,addIfUnderCorePoolSize方法首先呼叫mainLock加鎖,再次判斷當前執行緒數小於corePoolSize並且執行緒池處於RUNNING狀態,則呼叫addThread增加執行緒。



圖一:ThreadPoolExecutor執行狀態圖

addIfUnderCorePoolSize原始碼

private boolean addIfUnderCorePoolSize(Runnable firstTask)
{
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try
    {
        if (poolSize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);
    }
    finally
    {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

addThread方法首先建立Work物件,然後呼叫threadFactory建立新的執行緒,如果建立的執行緒不為null,將Work物件的 thread屬性設定為此創建出來的執行緒,並將此Work物件放入workers中,然後在增加當前執行緒池的中執行緒數,增加後回到 addIfUnderCorePoolSize方法 ,釋放mainLock,最後啟動這個新建立的執行緒來執行新傳入的任務。
可以發現,這段原始碼是如果發現小於corePoolSize就會建立一個新的執行緒,並且呼叫執行緒的start()方法將執行緒執行起來:這個addThread()方法,我們先不考慮細節,因為我們還要先看到前面是怎麼進去的,這裡可以發信啊,只有沒有建立成功Thread才會返回false,也就是噹噹前的poolSize > corePoolSize的時候,或執行緒池已經不是在running狀態的時候才會出現。
注意:這裡在外部判定一次poolSize和corePoolSize只是初步判定,內部是加鎖後判定的,以得到更為準確的結果,而外部初步判定如果是大於了,就沒有必要進入這段有鎖的程式碼了。

addThread原始碼

private Thread addThread(Runnable firstTask)
{
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);
    < span style = "color:#ff0000;" > < / span >
                   if (t != null)
    {
        w.thread = t;
        workers.add(w);
        int nt = ++poolSize;
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}

ThreadFactory介面預設實現DefaultThreadFactory

public Thread newThread(Runnable r)
{
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}

這裡建立了一個Work,其餘的操作,就是講poolSize疊加,然後將將其放入workers的執行佇列等操作;
我們主要關心Worker是幹什麼的,因為這個threadFactory對我們用途不大,只是做了Thread的命名處理;而Worker你會發現它的定義也是一個Runnable,外部開始在程式碼段中發現了呼叫哪個這個Worker的start()方法,也就是執行緒的啟動方法,其實也就是呼叫了Worker的run()方法,那麼我們重點要關心run方法是如何處理的。

Worker的run方法

public void run()
{
    try
    {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null)
        {
            runTask(task);
            task = null;
        }
    }
    finally
    {
        workerDone(this);
    }
}

從以上方法可以看出,Worker所在的執行緒啟動後,首先執行建立其時傳入的Runnable任務,執行完成後,迴圈呼叫getTask來獲取新的任務,在沒有任務的情況下,退出此執行緒。FirstTask其實就是開始在建立work的時候,由外部傳入的Runnable物件,也就是你自己的Thread,你會發現它如果發現task為空,就會呼叫getTask()方法再判定,直到兩者為空,並且是一個while迴圈體。

getTask原始碼

Runnable getTask()
{
    for (;;)
    {
        try
        {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit())
            {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        }
        catch (InterruptedException ie)
        {
            // On interruption, re-check runState
        }
    }
}

你會發現它是從workQueue佇列中,也就是等待佇列中獲取一個元素出來並返回!當前執行緒執行完後,在到workQueue中去獲取一個task出來,繼續執行,這樣就保證了執行緒池中有一定的執行緒一直在執行;此時若跳出了while循 環,只有workQueue佇列為空才會出現或出現了類似於shutdown的操作,自然執行佇列會減少1,當再有新的執行緒進來的時候,就又開始向 worker裡面放資料了,這樣以此類推,實現了執行緒池的功能。

execute方法部分實現

if (runState == RUNNING && workQueue.offer(command))
{
    if (runState != RUNNING || poolSize == 0)
        ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
    reject(command); // is shutdown or saturated
如果當前執行緒池數量大於corePoolSize或addIfUnderCorePoolSize方法執行失敗,則執行後續操作;如果執行緒池處於執行狀態 並且workQueue中成功加入任務,再次判斷如果執行緒池的狀態不為執行狀態或當前執行緒池數為0,則呼叫 ensureQueuedTaskHandled方法。

ensureQueuedTaskHandled原始碼

private void ensureQueuedTaskHandled(Runnable command)
{
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean reject = false;
    Thread t = null;
    try
    {
        int state = runState;
        if (state != RUNNING && workQueue.remove(command))
            reject = true;
        else if (state < STOP &&
                 poolSize < Math.max(corePoolSize, 1) &&
                 !workQueue.isEmpty())
            t = addThread(null);
    }
    finally
    {
        mainLock.unlock();
    }
    if (reject)
        reject(command);
    else if (t != null)
        t.start();
}

第一個if,也就是噹噹前狀態為running的時候,就會去執行workQueue.offer(command),這個workQueue其實就是一 個BlockingQueue,offer()操作就是在佇列的尾部寫入一個物件,此時寫入的物件為執行緒的物件而已;所以你可以認為只有執行緒池在 RUNNING狀態,才會在佇列尾部插入資料,否則就執行else if,其實else if可以看出是要做一個是否大於MaximumPoolSize的判定,如果大於這個值,就會做reject的操作。ensureQueuedTaskHandled方法判斷執行緒池執行,如果狀態不為執行狀態,從workQueue中刪除,並呼叫reject做拒絕處理。

reject原始碼

void reject(Runnable command)
{
    handler.rejectedExecution(command, this);
}

再次回到execute方法

if (runState == RUNNING && workQueue.offer(command))
{
    if (runState != RUNNING || poolSize == 0)
        ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
    reject(command); // is shutdown or saturated

如執行緒池workQueue offer失敗或不處於執行狀態,呼叫addIfUnderMaximumPoolSize, addIfUnderMaximumPoolSize方法基本和addIfUnderCorePoolSize實現類似,不同點在於根據最大執行緒數(maximumPoolSize)進行比較,如果超過最大執行緒數,返回false,呼叫reject方法。

addIfUnderMaximumPoolSize原始碼

private boolean addIfUnderMaximumPoolSize(Runnable firstTask)
{
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try
    {
        if (poolSize < maximumPoolSize && runState == RUNNING)
            t = addThread(firstTask);
    }
    finally
    {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

也就是如果執行緒池滿了,而且執行緒池呼叫了shutdown後,還在呼叫execute方法時,就會丟擲上面說明的異常:RejectedExecutionException。

workerDone原始碼

void workerDone(Worker w)
{
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try
    {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
        if (--poolSize == 0)
            tryTerminate();
    }
    finally
    {
        mainLock.unlock();
    }
}

注意這裡將workers.remove(w)掉,並且呼叫了—poolSize來做操作。至於tryTerminate是做了更多關於回收方面的操作。

runTask(task)原始碼

private void runTask(Runnable task)
{
    final ReentrantLock runLock = this.runLock;
    runLock.lock();
    try
    {
        if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
            thread.interrupt();
        boolean ran = false;
        beforeExecute(thread, task);
        try
        {
            task.run();
            ran = true;
            afterExecute(task, null);
            ++completedTasks;
        }
        catch (RuntimeException ex)
        {
            if (!ran)
                afterExecute(task, ex);
            throw ex;
        }
    }
    finally
    {
        runLock.unlock();
    }
}

你可以看到,這裡面的task為傳入的task資訊,呼叫的不是start方法,而是run方法,因為run方法直接呼叫不會啟動新的執行緒,也是因為這樣,導致了你無法獲取到你自己的執行緒的狀態,因為執行緒池是直接呼叫的run方法,而不是start方法來執行。
這裡有個beforeExecute和afterExecute方法,分別代表在執行前和執行後,你可以做一段操作,在這個類中,這兩個方法都是空的,因為普通執行緒池無需做更多的操作。
如果你要實現類似暫停等待通知的或其他的操作,可以自己extends後進行重寫構造。

新增任務處理流程

AbortPolicy()

public static class AbortPolicy implements RejectedExecutionHandler
{
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }

    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always.
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
    {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
/*當執行緒池中的數量等於最大執行緒數時,直接丟擲丟擲java.util.concurrent.RejectedExecutionException異常。*/

CallerRunsPolicy()

public static class CallerRunsPolicy implements RejectedExecutionHandler
{
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
    {
        if (!e.isShutdown())
        {
            r.run();
        }
    }
}

當執行緒池中的數量等於最大執行緒數時、重試執行當前的任務,交由呼叫者執行緒來執行任務。

DiscardOldestPolicy()

public static class DiscardOldestPolicy implements RejectedExecutionHandler
{
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
    {
        if (!e.isShutdown())
        {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

當執行緒池中的數量等於最大執行緒數時、拋棄執行緒池中最後一個要執行的任務,並執行新傳入的任務。

DiscardPolicy()

public static class DiscardPolicy implements RejectedExecutionHandler
{
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }
    /**
     * Does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
    {
    }
}

當執行緒池中的數量等於最大執行緒數時,不做任何動作。
通常你得到執行緒池後,會呼叫其中的:submit方法或execute方法去操作;其實你會發現,submit方法最終會呼叫execute方法來進行操 作,只是他提供了一個Future來託管返回值的處理而已,當你呼叫需要有返回值的資訊時,你用它來處理是比較好的;這個Future會包裝對 Callable資訊,並定義一個Sync物件,當你發生讀取返回值的操作的時候,會通過Sync物件進入鎖,直到有返回值的資料通知。

違反Java高質量程式碼案例

非同步運算使用Callable介面

Callable介面程式碼如下:

public interface Callable<V>{
    v call() throws Exception;
}

實現Callable介面,只是表明它是一個可呼叫的任務,並不表示它具有多執行緒運算的能力,還是要執行器來執行。程式碼如下:

class TaxCalculator implements Callable<Integer>{

    private int seedMoney;
    public TaxCalculator(int _seedMoney){
        seedMoney=_seedMoney;
    }
    @Override
    public Integer call() throws Exception {
       TimeUnit.MILLISECONDS.sleep(10000);
        return seedMoney/10;
    }

}

這裡模擬稅款計算器運算,可能花費10秒鐘時間。使用者輸入即有輸出,若耗時較長,則顯示運算進度。如果我們直接計算,就只有一個main執行緒,是不可能友好提示的,如果稅金不計算完畢,也不會執行後續動作,所以最好的辦法就是重啟一個執行緒來運算,讓main執行緒做進度提示

public static void main(String[] args) throws Exception{
        ExecutorService es=Executors.newSingleThreadExecutor();
        Future<Integer> future=es.submit(new TaxCalculator(100));
        while(!future.isDone()){
            TimeUnit.MILLISECONDS.sleep(200);
            System.out.println("#");
        }
        System.out.println("\n 計算完成,稅金是:"+future.get()+"元");
        es.shutdown();

    }

Executors是一個靜態工具類,提供了非同步執行器的建立能力,如單執行緒執行newSingleThreadExcutor、固定執行緒數量的執行器newFixedThreadPool等,一般是非同步計算的入口類。

優先選擇執行緒池

執行緒的狀態只能由新建狀態轉變為執行態後才可能被阻塞或等待,最後終結,不可能產生本末倒置的情況,程式碼如下:

public static void main(String[] args) throws Exception{

    Thread t=new Thread(new Runnable() {

        @Override
        public void run() {
            System.out.println("執行緒在執行");

        }
    });
    t.start();
    while(!t.getState().equals(Thread.State.TERMINATED)){
        TimeUnit.MILLISECONDS.sleep(10);
    }
    t.start();
}

此時程式執行會報IllegalThreadStateException異常,原因就是不能從結束狀態直接轉換為可執行狀態。這時可以引入執行緒池,當系統需要時直接從執行緒池中獲得執行緒,運算出結果,再把執行緒返回到執行緒池中,程式碼如下:

public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 4; i++) {
            es.submit(new Runnable() {

                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());

                }

            });
        }
        es.shutdown();
    }

執行緒死鎖

Java是單執行緒語言,一旦執行緒死鎖,只能藉助外部程序重啟應用才能解決。

static class A {
        public synchronized void a1(B b) {
            String name = Thread.currentThread().getName();
            System.out.println(name + "進入A.a1()");
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                // TODO: handle exception
            }
            System.out.println(name + "試圖訪問B.b2()");
            b.b2();
        }

        public synchronized void a2() {
            System.out.println("進入 a.a2()");
        }
    }

    static class B {
        public synchronized void b1(A a) {
            String name = Thread.currentThread().getName();
            System.out.println(name + "進入B.b1()");
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                // TODO: handle exception
            }
            System.out.println(name + "試圖訪問A.a2()");
            a.a2();
        }

        public synchronized void b2() {
            System.out.println("進入 B.b2()");
        }
    }

    public static void main(String[] args) {
        final A a = new A();
        final B b = new B();
        new Thread(new Runnable() {

            @Override
            public void run() {
                a.a1(b);
            }
        }, "執行緒A").start();
        ;
        new Thread(new Runnable() {

            @Override
            public void run() {
                b.b1(a);
            }
        }, "執行緒B").start();
        ;
    }

此段程式定義了兩個資源A和B,然後在兩個執行緒A、B中使用了該資源,由於兩個資源之間有互動操作,並且都是同步方法,因此線上程A休眠1秒鐘後,它會試圖訪問資源B的b2方法,但是執行緒B持有該類的鎖,並同時在等待A執行緒釋放其鎖資源,所以此時就出現了兩個執行緒在互相等待釋放資源的情況,也就是死鎖。可以使用自旋鎖改進,程式碼如下:

public  void b2()
{
    try
    {
        if(Lock.trylock(2, TimeUnit.SECONDS))
        {
            System.out.println("進入 B.b2()");
        }
    }
    catch (InterruptedException e)
    {
        // TODO: handle exception
    }
    finally
    {
        Lock.unlock();
    }


}

它原理和互斥鎖一樣,如果一個執行單元要想訪問被自旋鎖保護的共享資源,則必須先得到鎖,在訪問完共享資源後,也必須釋放鎖。

忽略設定阻塞佇列長度

BlockingQueue是一種集合,實現了Collection介面,容量是不可以自行管理的,程式碼如下:

public static void main(String[] args) throws Exception {
        BlockingDeque<String> bq = (BlockingDeque<String>) new ArrayBlockingQueue<String>(
                5);
        for (int i = 0; i < 10; i++) {
            bq.add("");
        }
    }

阻塞佇列容量是固定的,非阻塞佇列則是變長的。阻塞佇列可以在宣告是指定佇列的容量,若指定的容量,則元素的數量不可超過該容量,若不指定,佇列的容量為Integer的最大值

public  class ArrayBlockingQueue<E> extends AbstractQueue<E> implements
    BlockingDeque<E>, java.io.Serializable
{
    public final E[] items;
    private int count;

    public boolean add(E e)
    {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    public boolean offer(E e)
    {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try
        {
            if (count == items.length)
                ;
            else
            {
                insert(e);
                return true;
            }
        }
        finally
        {
            lock.unlock();
        }
    }

}

上面在加入元素時,如果判斷當前佇列已滿,則返回false,表示插入失敗,之後再包裝成佇列滿異常。

使用stop方法停止執行緒

stop方法會破壞原子邏輯,程式碼如下:

class MutiThread implements Runnable {
    int a = 0;

    @Override
    public void run() {
        // TODO Auto-generated method stub
        synchronized ("") {
            a++;
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            a--;
            String tn = Thread.currentThread().getName();
            System.out.println(tn + ":a=" + a);

        }
    }

    public static void main(String[] args) {
        MutiThread t = new MutiThread();
        Thread t1 = new Thread(t);
        t1.start();
        for (int i = 0; i < 5; i++) {
            new Thread(t).start();
        }
        t1.stop();
    }
}

所有執行緒共享了一個MutilThread的例項變數t,由於在run方法中加入了同步程式碼塊,所以只能有一個執行緒進入到synchronized塊中,可以自定義標誌位來決定執行緒執行情況,程式碼如下:

class SafeStopThread extends Thread{
    private volatile boolean stop=false;
    @Override
    public void run()
    {//判斷執行緒體是否執行
        while(stop)
        {}
    }
    //執行緒終止
    public void terminate(){
        stop=true;
    }
}

線上程體中判斷是否需要停止執行,即可保證執行緒體的邏輯完整性,而且也不會破壞原子邏輯。

覆寫start方法

程式碼:

class MutiThread implements Thread
{


    @Override
    public void start()
    {
        //呼叫執行緒體
        run();
    }
}
@Override
public void run()
{

}
}
public static void main(String[] args)
{
    MutiThread t = new MutiThread();
    t.start();
}

}

main方法根本就沒有啟動一個子執行緒,整個應用程式中只有一個主執行緒在執行,並不會建立其他的執行緒。改進後代碼如下:

class MutiThread implements Thread
{


    @Override
    public void start()
    {
        /*執行緒啟動前的業務處理*/
        super.start();
        /*執行緒啟動後的業務處理*/
    }
}
@Override
public void run()
{

}
}

start方法呼叫父類的start方法,沒有主動呼叫run方法,由JVM自行呼叫,不用我們的顯式實現。

使用過多執行緒優先順序

Java執行緒有10個基本,級別為0代表JVM
程式碼如下:

class MutiThread implements Runnable {
    public void start(int _priority) {
        Thread t = new Thread(this);
        t.setPriority(_priority);
        t.start();
    }

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            Math.hypot(Math.pow(924526789, i), Math.cos(i));
        }
        System.out.println("Priority:"+Thread.currentThread().getPriority());
    }
    public static void main(String[] args) {
        for(int i=0;i<20;i++)
        {
            new MutiThread().start(i%10+1);
        }
    }
}

Java優先順序只是代表搶佔CPU機會大小,優先順序越高,搶佔CPU機會越大,被優先執行的可能性越高,優先順序相差不大,則搶佔CPU機會差別也不大。導致優先順序為9的執行緒比優先順序為10的執行緒先執行。於是在Thread類中設定三個優先順序,建議使用優先順序常量,而不是1到10的隨機數字,程式碼如下:

  public final static int MIN_PRIORITY = 1;

/**
  * The default priority that is assigned to a thread.
  */
public final static int NORM_PRIORITY = 5;

/**
 * The maximum priority that a thread can have.
 */
public final static int MAX_PRIORITY = 10;

/**
 * Returns a reference to the currently executing thread object.
 *
 * @return  the currently executing thread.
 */
}

Lock與synchronized

Lock為顯式鎖,synchronized為內部鎖,程式碼如下:

class  Task
{
    public void dosomething(){
        try {
            Thread.sleep(2000);
        } catch (Exception e) {
            // TODO: handle exception
        }
        StringBuffer sb=new StringBuffer();
        sb.append("執行緒名:"+Thread.currentThread().getName());
        sb.append(",執行緒時間:"+Calendar.getInstance().get(13)+"s");
        System.out.println(sb);
    }
}
//顯示鎖任務
class TaskWithLock extends Task implements Runnable{
private final Lock lock=new ReentrantLock();
    @Override
    public void run() {
        try {
            lock.lock();
            dosomething();
        } finally
        {
            lock.unlock();
        }

    }};
    //內部鎖任務
    class TaskWithSync extends Task implements Runnable{

        @Override
        public void run() {

                synchronized ("A") {
                    dosomething();

                }


        }};

對於同步資源來說,顯式鎖時物件級別的鎖,而內部鎖時類級別的鎖,也就是說lock鎖時跟隨物件的,synchronized鎖時跟隨類
改進方法:把Lock定義為所有執行緒的共享變數。

public static void main(String[] args) {
        //多個執行緒共享鎖
        final Lock lock=new ReentrantLock();
        ……
    }

執行緒池異常處理

Java中執行緒執行的任務介面java.lang.Runnable 要求不丟擲Checked異常,

public interface Runnable {   

    public abstract void run();   
}

那麼如果 run() 方法中丟擲了RuntimeException,將會怎麼處理了?
通常java.lang.Thread物件執行設定一個預設的異常處理方法:

java.lang.Thread.setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)   

而這個預設的靜態全域性的異常捕獲方法時輸出堆疊。當然,我們可以覆蓋此預設實現,只需要一個自定義的java.lang.Thread.UncaughtExceptionHandler介面實現即可。

public interface UncaughtExceptionHandler {   

    void uncaughtException(Thread t, Throwable e);   
}

而線上程池中卻比較特殊。預設情況下,執行緒池 java.util.concurrent.ThreadPoolExecutor 會Catch住所有異常, 當任務執行完成(java.util.concurrent.ExecutorService.submit(Callable))獲取其結果 時(java.util.concurrent.Future.get())會丟擲此RuntimeException。

/**   
 * Waits if necessary for the computation to complete, and then   
 * retrieves its result.   
 *   
 * @return the computed result   
 * @throws CancellationException if the computation was cancelled   
 * @throws ExecutionException if the computation threw an exception   
 * @throws InterruptedException if the current thread was interrupted while waiting   
 */   
V get() throws InterruptedException, ExecutionException;

其中 ExecutionException 異常即是java.lang.Runnable 或者 java.util.concurrent.Callable 丟擲的異常。

也就是說,執行緒池在執行任務時捕獲了所有異常,並將此異常加入結果中。這樣一來執行緒池中的所有執行緒都將無法捕獲到丟擲的異常。 從而無法通過設定執行緒的預設捕獲方法攔截的錯誤異常。也不同通過 自定義執行緒來完成異常的攔截。好在java.util.concurrent.ThreadPoolExecutor 預留了一個方法,執行在任務執行完畢進行擴充套件(當然也預留一個protected方法beforeExecute(Thread t, Runnable r)):

protected void afterExecute(Runnable r, Throwable t) { } 

此方法的預設實現為空,這樣我們就可以通過繼承或者覆蓋ThreadPoolExecutor 來達到自定義的錯誤處理。

解決辦法如下:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(11, 100, 1, TimeUnit.MINUTES, //   
        new ArrayBlockingQueue<Runnable>(10000),//   
        new DefaultThreadFactory()) {   

    protected void afterExecute(Runnable r, Throwable t) {   
        super.afterExecute(r, t);   
        printException(r, t);   
    }   
};   

private static void printException(Runnable r, Throwable t) {   
    if (t == null && r instanceof Future<?>) {   
        try {   
            Future<?> future = (Future<?>) r;   
            if (future.isDone())   
                future.get();   
        } catch (CancellationException ce) {   
            t = ce;   
        } catch (ExecutionException ee) {   
            t = ee.getCause();   
        } catch (InterruptedException ie) {   
            Thread.currentThread().interrupt(); // ignore/reset   
        }   
    }   
    if (t != null)   
        log.error(t.getMessage(), t);   
}

使用SimpleThread類

TestThreadPool類是一個測試程式,用來模擬客戶端的請求,當你執行它時,系統首先會顯示執行緒池的初始化資訊,然後提示你從鍵盤上輸入字串,並按下回車鍵,這時你會發現螢幕上顯示資訊,告訴你某個執行緒正在處理你的請求,如果你快速地輸入一行行字串,那麼你會發現執行緒池中不斷有執行緒被喚醒,來處理你的請求,在本例中,我建立了一個擁有10個執行緒的執行緒池,如果執行緒池中沒有可用執行緒了,系統會提示你相應的警告資訊,但如果你稍等片刻,那你會發現螢幕上會陸陸續續提示有執行緒進入了睡眠狀態,這時你又可以傳送新的請求了。
程式碼如下:

//TestThreadPool.java

import java.io.*;
public class TestThreadPool
{
    public static void main(String[] args)
    {
        try
        {
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            String s;
            ThreadPoolManager manager = new ThreadPoolManager(10);
            while((s = br.readLine()) != null)
            {
                manager.process(s);
            }
        }
        catch(IOException e) {}
    }
}

ThreadPoolManager類,顧名思義,它是一個用於管理執行緒池的類,它的主要職責是初始化執行緒池,併為客戶端的請求分配不同的執行緒來進行處理,如果執行緒池滿了,它會對你發出警告資訊。
程式碼如下:

import java.util.*;
  
  class ThreadPoolManager
     
{
     
       private int maxThread;
       public Vector vector;
       public void setMaxThread(int threadCount)
      
    {
           maxThread = threadCount;
          
    }
      
       public ThreadPoolManager(int threadCount)
      
    {
           setMaxThread(threadCount);
          System.out.println("Starting thread pool...");
           vector = new Vector();
           for(int i = 1; i <= 10; i++)
               {
               SimpleThread thread = new SimpleThread(i);
               vector.addElement(thread);
               thread.start();
              
        }
          
    }
      
       public void process(String argument)
      
    {
           int i;
           for(i = 0; i < vector.size(); i++)
              {
              SimpleThread currentThread = (SimpleThread)vector.elementAt(i);
               if(!currentThread.isRunning())
                   {
                  System.out.println("Thread " + (i + 1) + " is processing:" +