1. 程式人生 > >ThreadPoolExecutor的分析(二)

ThreadPoolExecutor的分析(二)

ase 任務 second nbsp while循環 clear ech 嘗試 turn

說明:本作者是文章的原創作者,轉載請註明出處:本文地址:http://www.cnblogs.com/qm-article/p/7859620.html

內部類Worker的分析

從源碼可知。該內部類繼承了AQS,且實現了runnable接口,說明,此類擁有鎖的功能,且能充當線程使用,在前面的博文<線程的使用及ThreadPoolExecutor的分析(一)>中的addworker方法中當滿足條件時會new Worker(firsttask),其內部發生了先是設置狀態值為-1,其作用是為了在運行runWorker方法之前禁止中斷,同時下一步構建了新的線程,並傳入this參數

 Worker(Runnable firstTask) {
            setState(
-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }

該類的其他方法作用如下

isHeldExclusively(),檢查是否排他,即檢查是否上了鎖,若true則上鎖狀態,否則為不上鎖狀態

tryAcquire(int) ,該方法為AbstractQueuedSynchronizer,裏的,該類重寫了該方法,其作用是檢查當前線程是嘗試獲取鎖,通過cas操作來保證原子性,若返回true,則說明處於無鎖狀態,隨機設置當前線程為此鎖的主人

setExclusiveOwnerThread(Thread.currentThread()),設置當前線程為此鎖的擁有者

tryRelease(int) ,嘗試釋放鎖,其內部將此鎖的擁有者設為null,即釋放了鎖

lock() ,加鎖

tryLock(),嘗試加鎖

unlock() 解鎖

isLocked(),返回是否已經被其他線程獲取了鎖

interruptIfStarted() ,若開始了,則中斷線程

 void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

  其中getState()>=0,在開始的構造方法中設置state的值為-1,就是防止線程被中斷,

在這裏分析下tryAcquire(int)中的cas操作

點進去看其源碼

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

  其屬於AQS中的方法,其中this,代表當前調用該方法的類的實例,在這裏代表worker的實例,其中stateOffset代表要被更新的值,expect,代表內存中的舊值,update代表被更新後的值,

cas大致原理就是若內存中的值,與即將被更新的值一樣,則用update取代該值,否則不取代,舉個例子,如數據庫表中,有一個version,和一個name字段,在開始時,你查詢出了這個表的所有字段

之後,version的值已經存在內存中了,你突然要改變name值,此時,你也怕在這個期間有其他線程也更改了該name字段的值,此時你只要查詢一次該表,若此時查詢出來的version值,與已經在內存

中的version值一樣,則更新name字段的值,否則不更新,cas操作也常用於樂觀鎖的實現。

其中stateOffset的值

       stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));

  該段代碼的作用獲取AbstractQueuedSynchronizer類中state字段的值,再賦值給stateOffset,該操作時線程安全的,private volatile int state;這是state的定義,其默認值為0,

protected final void setState(int newState) {
        state = newState;
    }

也可以通過以上set方法來進行重新賦值

線程啟動的入口

在addWorker的t.start()中

   if (workerAdded) {
        t.start();
        workerStarted = true;
   }

表明一啟動該thread,則會調用該復寫的run方法

 public void run() {
            runWorker(this);
  }

其中runworker()源碼如下

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

該方法大致作用是遍歷隊列,執行線程任務

先介紹該方法內的其他方法作用

getTask(),其作用是從阻塞隊列中取出線程任務

runStateAtLeast(int,int),判斷線程池至少處於什麽狀態,前面博文中,有寫線程池的幾個狀態及其大小
beforeExecute(Thread t, Runnable r),提供給ThreadPoolExecutor子類去實現
afterExecute(Runnable r, Throwable t),也是提供給子類去實現
processWorkerExit(Worker w, boolean completedAbruptly) 主要是為了清除和記錄以及死亡的worker線程

在runWorker方法中,通過while循環,去取當前worker中的線程任務firstWorker,和阻塞隊列中的線程任務
第一個if中的判斷是為了檢測線程池的狀態及線程是否中斷了,如果是,則中斷當前線程
try裏面的run方法是執行獲取到的線程的任務,將完成的線程任務進行累加

ThreadPoolExecutor的分析(二)