『網際網路架構』軟體架構-分散式系列併發程式設計(29)
說說JMM,執行緒,執行緒池。一切都為了分散式而行動!
JMM
- 理解下面的圖
java的檔案,需要進行編譯,通過java編譯程式設計class檔案,class檔案變成位元組碼,裝載到類裝載器中,通過類裝載器進行執行,執行的過程中的一個模型就是下面這個圖。
-
特性
1.可見性
>可見性是指當一個執行緒修改了共享變數後,其他執行緒能夠立即得知這個修改。通過之前對synchronzed記憶體語義進行了分析,當執行緒獲取鎖時會從主記憶體中獲取共享變數的最新值,釋放鎖的時候會將共享變數同步到主記憶體中。從而,synchronized具有可見性。同樣的在volatile分析中,會通過在指令中新增lock指令,以實現記憶體可見性。因此, volatile具有可見性。
2.原子性
原子性是指一個操作是不可中斷的,要麼全部執行成功要麼全部執行失敗,有著“同生共死”的感覺。及時在多個執行緒一起執行的時候,一個操作一旦開始,就不會被其他執行緒所幹擾。
3.順序性
synchronized語義表示鎖在同一時刻只能由一個執行緒進行獲取,當鎖被佔用後,其他執行緒只能等待。
在java記憶體模型中說過,為了效能優化,編譯器和處理器會進行指令重排序;也就是說java程式天然的有序性可以總結為:如果在本執行緒內觀察,所有的操作都是有序的;如果在一個執行緒觀察另一個執行緒,所有的操作都是無序的
由於JVM執行程式的實體是執行緒,而每個執行緒建立時JVM都會為其建立一個工作記憶體(有些地方稱為棧空間),用於儲存執行緒私有的資料,而Java記憶體模型中規定所有變數都儲存在主記憶體,主記憶體是共享記憶體區域,所有執行緒都可以訪問,但執行緒對變數的操作(讀取賦值等)必須在工作記憶體中進行,首先要將變數從主記憶體拷貝的自己的工作記憶體空間,然後對變數進行操作,操作完成後再將變數寫回主記憶體,不能直接操作主記憶體中的變數,工作記憶體中儲存著主記憶體中的變數副本拷貝,前面說過,工作記憶體是每個執行緒的私有資料區域,因此不同的執行緒間無法訪問對方的工作記憶體,執行緒間的通訊(傳值)必須通過主記憶體來完成。
4.Happens-Before原則
什麼是執行緒
執行緒是一個作業系統概念。作業系統負責這個執行緒的建立、掛起、執行、阻塞和終結操作。而作業系統建立執行緒、切換執行緒狀態、終結執行緒都要進行CPU排程——這是一個耗費時間和系統資源的事情。
-
生命週期
>Java當中,執行緒通常都有五種狀態,建立、就緒、執行、阻塞和死亡。
- 建立狀態。在生成執行緒物件,並沒有呼叫該物件的start方法,這是執行緒處於建立狀態。
- 就緒狀態。當呼叫了執行緒物件的start方法之後,該執行緒就進入了就緒狀態,但是此時執行緒排程程式還沒有把該執行緒設定為當前執行緒,此時處於就緒狀態。線上程執行之後,從等待或者睡眠中回來之後,也會處於就緒狀態。
- 執行狀態。執行緒排程程式將處於就緒狀態的執行緒設定為當前執行緒,此時執行緒就進入了執行狀態,開始執行run函式當中的程式碼。
- 阻塞狀態。執行緒正在執行的時候,被暫停,通常是為了等待某個時間的發生(比如說某項資源就緒)之後再繼續執行。sleep,suspend,wait等方法都可以導致執行緒阻塞。
- 死亡狀態。如果一個執行緒的run方法執行結束或者呼叫stop方法後,該執行緒就會死亡。對於已經死亡的執行緒,無法再使用start方法令其進入就緒。
可以用過jstack 或者idea debug快照顯示狀態
- “Low Memory Detector” 負責對可使用記憶體進行檢測,如果發現可用記憶體低,分配新的記憶體空間。
- “CompilerThread0” 用來呼叫JITing,實時編譯裝卸class。
- “Signal Dispatcher” 負責分發內部事件。
- “Finalizer” 負責呼叫Finalizer方法。
- “Reference Handler” 負責處理引用。
- “main” 是主執行緒。
- “VM Thread”, “VM Periodic Task Thread”從名字上看是虛機內部執行緒。
- 狀態描述
- NEW 狀態是指執行緒剛建立, 尚未啟動
- RUNNABLE 狀態是執行緒正在正常執行中, 當然可能會有某種耗時計算/IO等待的操作/CPU時間片切換等, 這個狀態下發生的等待一般是其他系統資源, 而不是鎖, Sleep等
- BLOCKED 這個狀態下, 是在多個執行緒有同步操作的場景, 比如正在等待另一個執行緒的synchronized 塊的執行釋放, 或者可重入的 synchronized塊裡別人呼叫wait() 方法, 也就是這裡是執行緒在等待進入臨界區
- WAITING 這個狀態下是指執行緒擁有了某個鎖之後, 呼叫了他的wait方法, 等待其他執行緒/鎖擁有者呼叫 notify / notifyAll 一遍該執行緒可以繼續下一步操作, 這裡要區分 BLOCKED 和 WATING 的區別, 一個是在臨界點外面等待進入, 一個是在理解點裡面wait等待別人notify, 執行緒呼叫了join方法 join了另外的執行緒的時候, 也會進入WAITING狀態, 等待被他join的執行緒執行結
- TIMED_WAITING 這個狀態就是有限的(時間限制)的WAITING, 一般出現在呼叫wait(long), join(long)等情況下, 另外一個執行緒sleep後, 也會進入 TIMED_WAITING狀態TERMINATED 這個狀態下表示 該執行緒的run方法已經執行完畢了, 基本上就等於死亡了(當時如果執行緒被持久持有, 可能不會被回收)
- 優先順序Priority
執行緒的優先順序是將該執行緒的重要性傳給了排程器、cpu處理執行緒順序有一定的不確定,但是排程器會傾向於優先權高的先執行
- 實現方式
Thread、Runnable、Callable
Runnable和Thread區別:
1、效果上沒區別,寫法上的區別而已。
2、沒有可比性,Thread實現了Runnable介面並進行了擴充套件,我們通常拿來進行比較只是寫法上的比較,而Thread和Runnable的實質是實現的關係,不是同類東西。
3、Callable1.5引入,具有返回值,並且支援泛型,並且不是run方法了是call方法。
- 為什麼有執行緒還要用執行緒池
執行緒的切換需要耗時,執行緒池直接從管理池子裡面的執行緒,效率更高。
Executor框架體系
Java.util.concurrent中的Executor(中文翻譯是執行器)是管理咱們之前的Thread執行緒的。目的是是簡化併發程式設計。
ScheduledThreadPoolExecutor和ThreadPoolExecutor。
構造器:核心數量,任務佇列容器,存活時間,執行緒工廠,處理器
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,//存活時間 TimeUnit unit, BlockingQueue<Runnable> workQueue, //阻塞佇列 ThreadFactory threadFactory,//執行緒工廠 RejectedExecutionHandler handler) {//hander處理
- LinkedBlockingQueue
-
Execute(submit)方法 ->java.util.concurrent.ThreadPoolExecutor#execute
簡要分析一下execute原始碼:執行一個Runnable物件時,首先通過workerCountOf(c)獲取執行緒池中執行緒的數量,如果池中的數量小於corePoolSize就呼叫addWorker新增一個執行緒來執行這個任務。否則通過workQueue.offer(command)方法入列。如果入列成功還需要在一次判斷池中的執行緒數,因為我們建立執行緒池時可能要求核心執行緒數量為0,所以我們必須使用addWorker(null, false)來建立一個臨時執行緒去阻塞佇列中獲取任務來執行。
if (command == null) throw new NullPointerException(); int c = ctl.get(); //判斷是否小於核心數量,是直接新增work成功後直接退出 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get();// 增加失敗後繼續獲取標記 } //判斷是執行狀態並且扔到workQueue裡成功後 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //再次check判斷執行狀態如果是非執行狀態就移除出去&reject掉 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) //否則發現可能執行執行緒數是0那麼增加一個null的worker。 addWorker(null, false); } else if (!addWorker(command, false)) //直接增加worker如果不成功直接reject reject(command);
-
addWorker方法
>總結:
//這個兩個for迴圈主要是判斷能否增加一個執行緒,
//外迴圈來判斷執行緒池的狀態
//內迴圈主要是個增加執行緒數的CAS操作
第一個引數firstTask不為null,則建立的執行緒就會先執行firstTask物件,然後去阻塞佇列中取任務,否直接到阻塞佇列中獲取任務來執行。第二個引數,core引數為真,則用corePoolSize作為池中執行緒數量的最大值;為假,則以maximumPoolSize作為池中執行緒數量的最大值。
T.start方法 run方法執行>呼叫了runWorker方法
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;// 兩種情況1.如果非關閉狀態2.不是這種情況(停止狀態並且是null物件並且workQueue不等於null) for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;// 判斷是否飽和容量了 if (compareAndIncrementWorkerCount(c)) //增加一個work數量 然後跳出去 break retry; c = ctl.get();//增加work失敗後繼續遞迴 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);//增加一個worker final Thread t = w.thread; if (t != null) {//判斷是否 為null final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired.鎖定後並重新檢查下 是否存線上程工廠的失敗或者鎖定前的關閉 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);//增加work int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //本次要是新增加work成功就呼叫start執行 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;
-
runWorker
>總結:獲取任務、呼叫執行緒run方法
Thread wt = Thread.currentThread();//1.取到當前執行緒 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { //獲取任務 看看是否能拿到 點進入 Task中的佇列poll take是真正拿到task //workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 超時取出 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted.This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt();// 確保執行緒是能中斷的 try { beforeExecute(wt, task); //開始任務前的鉤子 Throwable thrown = null; try { task.run();//執行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); //任務後的鉤子 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); //看這兒 }
- processWorkerExit
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w);//移除work } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { //判斷是否還有任務 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false);
執行緒池的拒絕策略
四種策略 預設是AbortPolicy
名稱 | 意義 |
---|---|
AbortPolicy (預設) | 當任務新增到執行緒池中被拒絕時,它將丟擲 RejectedExecutionException 異常。 |
CallerRunsPolicy | 當任務新增到執行緒池中被拒絕時,會線上程池當前正在執行的Thread執行緒池中處理被拒絕的任務。 |
DiscardOldestPolicy | 當任務新增到執行緒池中被拒絕時,執行緒池會放棄等待佇列中最舊的未處理任務,然後將被拒絕的任務新增到等待佇列中 |
DiscardPolicy | 當任務新增到執行緒池中被拒絕時,執行緒池將丟棄被拒絕的任務。 |
-
ScheduledThreadPoolExecutor的實現
>ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor並且實現了ScheduledExecutorService介面。ScheduledExecutorService定義了使用ScheduledThreadPoolExecutor提交任務的介面schedule。schedule是一組返回值和引數不同的overwrite介面。以供使用者選擇提交任務的模式,比如是一次性的定時任務還是週期性的任務,需不需要返回結果等等。不管是哪個schedule介面,他的實現功能主要是把使用者提交的任務和模式封裝成ScheduledFutureTask,然後呼叫delayedExecute(Runnable command)。所以delayedExecute方法是真正的處理任務的方法。
PS:併發始終還是圍繞這執行緒的判斷來進行的一步一步的操作,所以提高服務程式效率的一個手段就是儘可能減少建立和銷燬物件的次數,特別是一些很耗資源的物件建立和銷燬。這塊只做瞭解吧。我已經寫懵逼了。
>>原創文章,歡迎轉載。轉載請註明:轉載自IT人故事會,謝謝!
>>原文連結地址:上一篇:已是最新文章