ThreadPoolExecutor原始碼解析
無論是直接還是間接的建立執行緒池,歸根結底都是通過ThreadPoolExecutor來建立執行緒池並且配置執行緒池特性的,需要執行新任務時,通過ThreadPoolExecutor的execute方法提交任務,具體如下所示。
//Executors.java public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { //ScheduledThreadPoolExecutor extends ThreadPoolExecutor return new ScheduledThreadPoolExecutor(corePoolSize); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } //...... }
//自定義執行緒池 ThreadPoolExecutor threadPoolExecutor; threadPoolExecutor = ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.DiscardOldestPolicy()); //提交新任務 threadPoolExecutor.execute(task); //新任務 Runnable task = new Runnable() { @Override public void run() { try { Thread.sleep(2000); Log.e("ThreadPoolExecutor","run task:"+System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } } };
那麼ThreadPoolExecutor內部是如何提交新任務,如何執行任務的呢?對於這個流程就是我們這裡要進行解析的。
ThreadPoolExecutor的構造及引數含義
在進行解析之前,我們先看看ThreadPoolExecutor的構造以及構造中用來配置執行緒池的引數含義。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
(1)corePoolSize:執行緒的核心執行緒數,預設一直存活,即使處於閒置狀態。若將allowCoreThreadTimeOut設定為true,閒置的核心執行緒在等待新任務時會有超時策略(即受到keepAliveTime限制)。
(2)maximumPoolSize:執行緒池能容納的最大執行緒數。
(3)keepAliveTime:非核心執行緒閒置時的超時時長,超過這個時間,非核心執行緒就會被回收。
(4)unit:keepAliveTime的時間單位
(5)workQueue:執行緒池中的任務佇列,通過執行緒池的execute方法提交的Runnable物件會儲存在這個引數中。常用任務佇列:
LinkedBlockingQueue(無界佇列):基於連結串列的阻塞佇列,按照 FIFO 原則對元素進行排序。
ArrayBlockingQueue(有界佇列):初始時需要指定佇列大小,基於陣列的阻塞佇列,按照 FIFO 原則對元素進行排序。
SynchronousQueue(同步佇列):不儲存元素,每個插入操作必須等待另一個執行緒呼叫移除操作,否則插入操作會一直阻塞。
DelayedWorkQueue(有序佇列):會通過每個任務按照距離下次執行時間間隔的大小來排序。
PriorityBlockingQueue(優先順序佇列):具有優先順序的阻塞佇列。
(6)threadFactory:執行緒工廠,為執行緒池提供建立執行緒的功能。預設情況下,執行緒池使用Executors.defaultThreadFactory()方法返回的執行緒工廠實現類。
(7)handler:拒絕策略。當執行緒池中執行緒達到或者超過最大執行緒數並且任務佇列已滿,這是再提交新任務時,就會拒絕執行此任務。預設是AbortPolicy,丟擲異常RejectedExecutionException。
AbortPolicy:丟棄新任務,並丟擲 RejectedExecutionException
DiscardPolicy:不做任何操作,直接丟棄新任務
DiscardOldestPolicy:丟棄佇列隊首的元素,並執行新任務
CallerRunsPolicy:由呼叫執行緒執行新任務
任務提交
瞭解了ThreadPoolExecutor的構造後,我們回到正題。從任務提交的方法execute入手。從原始碼或者原始碼的註解可知,任務提交會經過3個步驟,大致如下:
(1)若執行緒池有效執行緒數小於核心執行緒數量(corePoolSize),呼叫addWorker建立一個核心執行緒來執行任務。若執行緒池有效執行緒數大於等於核心執行緒數或者建立核心執行緒失敗,則執行步驟(2)。
(2)若執行緒池處於RUNNING狀態,嘗試將任務加入任務佇列中,若加入佇列成功,則嘗試進行double-check,若加入失敗,則執行步驟(3)。
(3)若執行緒池不是RUNNING狀態或者加入佇列失敗,嘗試建立非核心新執行緒直到有效執行緒達到maximumPoolSize,如果失敗,則呼叫reject()方法執行拒絕策略。
//ThreadPoolExecutor.java public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task.The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread.If it fails, we know we are shut down or saturated * and so reject the task. */ //ctl(AtomicInteger原子操作,執行緒安全) 包含32位資料,高3位存放執行緒池狀態,低29位存執行緒數 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //當前執行緒池中有效的執行緒數量小於核心執行緒數(corePoolSize)時,建立一個新的核心執行緒來執行任務 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //執行緒池處於RUNNING狀態,並且將新任務加入任務佇列中 int recheck = ctl.get(); //再次檢查執行緒池狀態 if (! isRunning(recheck) && remove(command)) //不處於RUNNING,並且成功將新任務從任務佇列中移除,執行拒絕策略 reject(command); else if (workerCountOf(recheck) == 0) //處於SHUTDOWN,有效執行緒數為0時,任務佇列裡可能還有任務沒執行的任務,建立新的非核心執行緒 addWorker(null, false); } else if (!addWorker(command, false)) //執行緒池不是RUNNING狀態或者加入佇列失敗,嘗試建立新的非核心執行緒失敗,執行緒池已經shutdown或者已經飽和了,所以拒絕任務 reject(command); }
從execute方法中,我們大致瞭解了一個任務提交,要麼直接建立一個執行緒來處理任務,要麼就是放到任務佇列裡面進行等待處理,要麼就是拒絕處理。
建立工作執行緒,啟動執行緒執行任務
那麼是如何建立執行緒來執行任務的呢?從addWorker方法,可以推出大致流程:
(1)判斷執行緒池狀態,是否需要建立工作執行緒,若需要,則原子性的增加當前有效工作執行緒數量。
(2)將任務封裝成一個Worker物件,Worker實現了Runnable介面,並在構造中通過ThreadFactory建立與Worker物件對應的執行緒,最後將這個Worker物件新增進workers這個HashSet集合中。
(3)啟動Workerd物件中對應的執行緒,執行Worker中的Run方法,從而間接呼叫runWorker方法。在runWorker方法中會去呼叫任務(即在execute中提交的Runnable物件)的run方法,除此之外,當本次任務完成後,執行緒還會繼續從阻塞任務佇列中取任務執行,直到阻塞佇列為空(即任務全部完成)。
(4)如果工作執行緒建立失敗了,呼叫addWorkerFailed方法,回滾工作執行緒的建立,將worker從workers集合中刪除,並原子性的減少工作執行緒數量。
//ThreadPoolExecutor.java private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); //獲取執行緒池狀態 int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //有效工作執行緒數 int wc = workerCountOf(c); //有效工作執行緒數量大於等於最大容量 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //遞增有效工作執行緒的數量 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get();// Re-read ctl // 此次的執行緒池狀態與上次獲取的狀態不相同 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //將任務封裝到Worker中 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. //獲取執行緒池狀態 int rs = runStateOf(ctl.get()); // 如果執行緒池處於RUNNING狀態執行新增任務操作,或執行緒池處於SHUTDOWN 狀態,firstTask 為空(任務佇列不為空,需要新增工作執行緒來執行任務) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //將worker新增到workers集合(workers:HashSet<Worker>) workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //啟動執行緒,執行任務,這裡會呼叫Worker中的Run方法,從而間接呼叫runWorker方法 t.start(); workerStarted = true; } } } finally { //工作執行緒建立失敗 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
封裝任務
將初始任務封裝到Worker中,並建立與Worker對應的工作執行緒。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in.Null if factory fails. */ final Thread thread;//worker持有的工作執行緒 /** Initial task to run.Possibly null. */ Runnable firstTask;//初始化時的第一個具體任務,可能是null /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //建立執行緒,並傳入將當前實現了Runnable介面的Worker物件 this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. */ public void run() { //執行任務 runWorker(this); } //...... }
處理任務
工作執行緒在runWorker方法中處理任務,執行緒完成本次任務後,繼續從阻塞佇列中取任務繼續執行,直到阻塞佇列中任務全部被完成。
//ThreadPoolExecutor.java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //任務不為null或者阻塞佇列還存在任務 //getTask()從阻塞佇列中獲取任務 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted.This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //中斷wt執行緒(當前執行緒) wt.interrupt(); try { //beforeExecute沒有具體實現,可以根據需要重寫這個方法,在任務執行之前呼叫 beforeExecute(wt, task); Throwable thrown = null; try { //執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //afterExecute沒有具體實現,可以根據需要重寫這個方法,在任務完成之後呼叫 afterExecute(task, thrown); } } finally { task = null; //增加worker對應的執行緒完成的任務數量 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //工作執行緒退出 processWorkerExit(w, completedAbruptly); } }
從阻塞佇列中獲取任務。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. //當執行緒池處於STOP及以上狀態時,釋放該執行緒。 //當執行緒處於SHUTDOWN 狀態時,並且workQueue請求佇列為空,釋放該執行緒。 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null;//返回null時,runWorker中會執行processWorkerExit將工作執行緒退出 } int wc = workerCountOf(c); // Are workers subject to culling? //如果呼叫allowCoreThreadTimeOut方法設定為true,則所有執行緒都有超時時間。 //如果當前執行緒數大於核心執行緒數則該執行緒有超時時間。 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //減少工作執行緒數 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //從阻塞佇列中取任務,根據情況選擇一直阻塞等待還是超時等待 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
總結:
(1)ThreadPoolExecutor是執行緒池的真正實現。
(2)從任務的提交到執行大致流程是:execute提交任務->addWorker建立工作執行緒(通過Worker分裝初始任務以及建立工作執行緒)->runWorker執行任務(通過getTask從阻塞佇列中獲取任務,任務完成後好會繼續從佇列中獲取任務,直到阻塞佇列為空即任務全部被完成)。
(3)ThreadPoolExecutor執行任務大致遵循的規則:a.如果執行緒池中的執行緒數未達到核心線的程數量,則建立一個核心執行緒來執行任務;b.若執行緒池中的執行緒數達到或者超過了核心執行緒數,阻塞佇列未滿,這時任務會被插入阻塞佇列進行排隊等待。c.若b中阻塞佇列已滿,但是執行緒池中的執行緒數未超過執行緒池規定的最大容量,這時會建立一個非核心執行緒來執行任務。d.若c中執行緒數已經達到規定的最大容量,這是若有新任務提交,就會拒絕執行此任務。
參考資料:
ofollow,noindex">https://blog.csdn.net/u014634338/article/details/79098437
https://cloud.tencent.com/developer/article/1109643
《Android 開發藝術探索》