1. 程式人生 > >java執行緒池詳細入門教程即原始碼解析

java執行緒池詳細入門教程即原始碼解析

##1、執行緒池概念
     執行緒池是執行緒的集合,通過執行緒池我們不需要自己建立執行緒,將任務提交給執行緒池即可。為什麼要使用執行緒池,首先,使用執行緒池可以重複利用已有的執行緒繼續執行任務,避免執行緒在建立和銷燬時造成的消耗。其次,由於沒有執行緒建立和銷燬時的消耗,可以提高系統響應速度。最後,通過執行緒可以對執行緒進行合理的管理,根據系統的承受能力調整可執行執行緒數量的大小等。

##2、執行緒池使用示例
     這裡做兩個測試,首先寫一個實現Runnable介面的類:

class MyTask implements Runnable{

	@Override
	public void run() {
		// TODO Auto-generated method stub
		System.out.println(Thread.currentThread().getName());
	}
	
}

main中程式碼:

		ExecutorService pool = Executors.newFixedThreadPool(5);
		Runnable myTask = new MyTask();
		Runnable myTask1 = new MyTask();
		Runnable myTask2 = new MyTask();
		Runnable myTask3 = new MyTask();
		Runnable myTask4 = new MyTask();
		pool.execute(myTask);
		pool.execute(myTask1);
		pool.execute(myTask2);
		pool.execute(myTask3);
		pool.execute(myTask4);

這裡newFixedThreadPool中的5代表可執行執行緒數量。所以結果:

pool-1-thread-3
pool-1-thread-1
pool-1-thread-4
pool-1-thread-5
pool-1-thread-2

可以看到這裡又五個執行緒在執行。這裡如果將可執行執行緒數量變成2,那麼結果如下:

pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2

可以看到執行緒1和執行緒2都被重複利用了,沒有建立更多的執行緒出來。

##3、執行緒池的狀態
     執行緒池的狀態及狀態間的切換如下圖所示:
這裡寫圖片描述


可以看到執行緒池有五種狀態RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED,不同於執行緒的轉檯切換的是,執行緒池的狀態可操作的其實只有兩種狀態,RUNNING和TERMINATED,執行緒池一旦建立就會處於RUNNING狀態,而TERMINATED是終止,執行緒池呼叫shutdown或者shutdownNow執行緒池其實就會慢慢的切換到終止狀態,中間三種狀態只是執行緒由執行態過渡到終止態的中間狀態,用來進行一些處理事項的。RUNNING狀態是正常執行狀態,能夠接受新的任務和處理任務,SHUTDOWN是關閉狀態,不能接受新的任務,將正在執行的任務處理完畢後進入到TIDYING整理狀態,STOP是停止狀態,不接受任務,也不處理任務,直接中斷任務,然後進入到TIDYING整理狀態,TIDYING狀態會自動呼叫terminate方法,呼叫完後進入到終止狀態。

##4、通過原始碼理解執行緒池
     首先是執行緒的建立,執行緒的建立一般通過一個工廠類建立,

ExecutorService pool = Executors.newFixedThreadPool(2);

進去看這個函式:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

可以看到還是直接new了一個執行緒池出來,再看這個執行緒池的構造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

首先對這幾個引數進行說明:
corePoolSize:核心池大小,表示執行緒池正常情況下能執行的執行緒最大多少。
maximumPoolSize:最大池大小,表示執行緒池最大能執行的執行緒數。
keepAliveTime:表示執行緒執行任務後的空閒存活時間,過了這個時間執行緒將被銷燬,這個引數的存在也直接實現了概述中所說的前兩點優點,執行緒的消耗可以分為A,B,C三個階段,A代表建立,B代表執行,C代表消耗,如果有很多個任務需要執行時,B執行所帶來的消耗代價小於A,C所帶來的消耗代價,那麼就通過這個引數,讓執行緒保持一段存活時間,方便執行後面的任務。
unit:執行緒空閒存活時間的單位。
workQueue:這個是任務佇列,將後面的任務加入到這個佇列中。
threadFactory:真正用於建立執行緒的引數。
handler:表示已經不能在接收任務時,呼叫的處理類。
在這個構造方法中分別對這幾個引數進行了初始化。**這裡再介紹下這幾個引數的關係,當用戶建立任務時,如果當前的執行的執行緒數小於核心池數量,那麼會建立新的執行緒來執行這個任務,如果當前執行的執行緒數大於核心池的數量,那麼就將任務新增到任務佇列中,如果任務佇列已經裝滿且當前執行執行緒數小於最大池的數量,那麼就再建立執行緒來執行這個任務,如果已經等於了最大池的數量,那麼將拒絕這個任務並且執行handle處理類。**執行緒的建立介紹了再來看任務的執行方法execute:

    public void execute(Runnable command) {
	    //當任務為空,丟擲空指標異常
        if (command == null)
            throw new NullPointerException();
        //這個是獲取到ctl引數的值,這個值是32位的
        //高三位儲存了執行緒池的狀態,其餘位儲存了執行緒池中執行的執行緒的數量
        int c = ctl.get();
        //當執行緒執行數量小於核心池時
        if (workerCountOf(c) < corePoolSize) {
            //建立新的執行緒來執行這個任務
            if (addWorker(command, true))
            //然後直接返回
                return;
            c = ctl.get();
        }
        //如果當前執行執行緒已經超過了核心池大小,並且執行緒池是執行的
        //將任務加入到任務佇列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果執行緒池沒有運行了,將任務移除
            if (! isRunning(recheck) && remove(command))
            //呼叫方法處理
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果新增佇列失敗了,那麼再建立一個新的執行緒
        else if (!addWorker(command, false))
        //如果失敗了,也就是最大池滿了,那麼呼叫拒絕的處理方法。
            reject(command);
    }

這裡多次呼叫到addWorker方法,這裡繼續看addWorker方法:

private boolean addWorker(Runnable firstTask, boolean core) {
	//前面這一片都是驗證性的工作,可以不重點看
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
	//從這裡開始重點看
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
          //獲取可重入互斥鎖
            final ReentrantLock mainLock = this.mainLock;
            //建立一個新的執行緒來之心這個任務
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
            //執行緒不為空,加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                            //將worker新增到set集合中
                        workers.add(w);
                        //獲取集合的worker數量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //worker新增成功執行任務
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

到這裡執行緒池的入門介紹結束。