死磕 java執行緒系列之執行緒池深入解析——未來任務執行流程
(手機橫屏看原始碼更方便)
注:java原始碼分析部分如無特殊說明均基於 java8 版本。
注:執行緒池原始碼部分如無特殊說明均指ThreadPoolExecutor類。
簡介
前面我們一起學習了執行緒池中普通任務的執行流程,但其實執行緒池中還有一種任務,叫作未來任務(future task),使用它您可以獲取任務執行的結果,它是怎麼實現的呢?
建議學習本章前先去看看彤哥之前寫的《死磕 java執行緒系列之自己動手寫一個執行緒池(續)》,有助於理解本章的內容,且那邊的程式碼比較短小,學起來相對容易一些。
問題
(1)執行緒池中的未來任務是怎麼執行的?
(2)我們能學到哪些比較好的設計模式?
(3)對我們未來學習別的框架有什麼幫助?
來個栗子
我們還是從一個例子入手,來講解來章的內容。
我們定義一個執行緒池,並使用它提交5個任務,這5個任務分別返回0、1、2、3、4,在未來的某一時刻,我們再取用它們的返回值,做一個累加操作。
public class ThreadPoolTest02 { public static void main(String[] args) throws ExecutionException, InterruptedException { // 新建一個固定5個執行緒的執行緒池 ExecutorService threadPool = Executors.newFixedThreadPool(5); List<Future<Integer>> futureList = new ArrayList<>(); // 提交5個任務,分別返回0、1、2、3、4 for (int i = 0; i < 5; i++) { int num = i; // 任務執行的結果用Future包裝 Future<Integer> future = threadPool.submit(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("return: " + num); // 返回值 return num; }); // 把future新增到list中 futureList.add(future); } // 任務全部提交完再從future中get返回值,並做累加 int sum = 0; for (Future<Integer> future : futureList) { sum += future.get(); } System.out.println("sum=" + sum); } }
這裡我們思考兩個問題:
(1)如果這裡使用普通任務,要怎麼寫,時間大概是多少?
如果使用普通任務,那麼就要把累加操作放到任務裡面,而且並不是那麼好寫(final的問題),總時間大概是1秒多一點。但是,這樣有一個缺點,就是累加操作跟任務本身的內容耦合到一起了,後面如果改成累乘,還要修改任務的內容。
(2)如果這裡把future.get()放到for迴圈裡面,時間大概是多少?
這個問題我們先不回答,先來看原始碼分析。
submit()方法
submit方法,它是提交有返回值任務的一種方式,內部使用未來任務(FutureTask)包裝,再交給execute()去執行,最後返回未來任務本身。
public <T> Future<T> submit(Callable<T> task) { // 非空檢測 if (task == null) throw new NullPointerException(); // 包裝成FutureTask RunnableFuture<T> ftask = newTaskFor(task); // 交給execute()方法去執行 execute(ftask); // 返回futureTask return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { // 將普通任務包裝成FutureTask return new FutureTask<T>(callable); }
這裡的設計很巧妙,實際上這兩個方法都是在AbstractExecutorService這個抽象類中完成的,這是模板方法的一種運用。
我們來看看FutureTask的繼承體系:
FutureTask實現了RunnableFuture介面,而RunnableFuture介面組合了Runnable介面和Future介面的能力,而Future介面提供了get任務返回值的能力。
問題:submit()方法返回的為什麼是Future介面而不是RunnableFuture介面或者FutureTask類呢?
答:這是因為submit()返回的結果,對外部呼叫者只想暴露其get()的能力(Future介面),而不想暴露其run()的能力(Runaable介面)。
FutureTask類的run()方法
經過上一章的學習,我們知道execute()方法最後呼叫的是task的run()方法,上面我們傳進去的任務,最後被包裝成了FutureTask,也就是說execute()方法最後會呼叫到FutureTask的run()方法,所以我們直接看這個方法就可以了。
public void run() {
// 狀態不為NEW,或者修改為當前執行緒來執行這個任務失敗,則直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 真正的任務
Callable<V> c = callable;
// state必須為NEW時才執行
if (c != null && state == NEW) {
// 執行的結果
V result;
boolean ran;
try {
// 任務執行的地方【本文由公從號“彤哥讀原始碼”原創】
result = c.call();
// 已執行完畢
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 處理異常
setException(ex);
}
if (ran)
// 處理結果
set(result);
}
} finally {
// 置空runner
runner = null;
// 處理中斷
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
可以看到程式碼也比較簡單,先做狀態的檢測,再執行任務,最後處理結果或異常。
執行任務這裡沒啥問題,讓我們看看處理結果或異常的程式碼。
protected void setException(Throwable t) {
// 將狀態從NEW置為COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 返回值置為傳進來的異常(outcome為呼叫get()方法時返回的)
outcome = t;
// 最終的狀態設定為EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 呼叫完成方法
finishCompletion();
}
}
protected void set(V v) {
// 將狀態從NEW置為COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 返回值置為傳進來的結果(outcome為呼叫get()方法時返回的)
outcome = v;
// 最終的狀態設定為NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 呼叫完成方法
finishCompletion();
}
}
咋一看,這兩個方法似乎差不多,不同的是出去的結果不一樣且狀態不一樣,最後都呼叫了finishCompletion()方法。
private void finishCompletion() {
// 如果佇列不為空(這個佇列實際上為呼叫者執行緒)
for (WaitNode q; (q = waiters) != null;) {
// 置空佇列
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// 呼叫者執行緒
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 如果呼叫者執行緒不為空,則喚醒它
// 【本文由公從號“彤哥讀原始碼”原創】
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 鉤子方法,子類重寫
done();
// 置空任務
callable = null; // to reduce footprint
}
整個run()方法總結下來:
(1)FutureTask有一個狀態state控制任務的執行過程,正常執行結束state從NEW->COMPLETING->NORMAL,異常執行結束state從NEW->COMPLETING->EXCEPTIONAL;
(2)FutureTask儲存了執行任務的執行緒runner,它是執行緒池中的某個執行緒;
(3)呼叫者執行緒是儲存在waiters佇列中的,它是什麼時候設定進去的呢?
(4)任務執行完畢,除了設定狀態state變化之外,還要喚醒呼叫者執行緒。
呼叫者執行緒是什麼時候儲存在FutureTask中(waiters)的呢?檢視構造方法:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
發現並沒有相關資訊,我們再試想一下,如果呼叫者不呼叫get()方法,那麼這種未來任務是不是跟普通任務沒有什麼區別?確實是的哈,所以只有呼叫get()方法了才有必要儲存呼叫者執行緒到FutureTask中。
所以,我們來看看get()方法中是什麼鬼。
FutureTask類的get()方法
get()方法呼叫時如果任務未執行完畢,會阻塞直到任務結束。
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果狀態小於等於COMPLETING,則進入佇列等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 返回結果(異常)
return report(s);
}
是不是很清楚,如果任務狀態小於等於COMPLETING,則進入佇列等待。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 我們這裡假設不帶超時
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 處理中斷
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 4. 如果狀態大於COMPLETING了,則跳出迴圈並返回
// 這是自旋的出口
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 如果狀態等於COMPLETING,說明任務快完成了,就差設定狀態到NORMAL或EXCEPTIONAL和設定結果了
// 這時候就讓出CPU,優先完成任務
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 1. 如果佇列為空
else if (q == null)
// 初始化佇列(WaitNode中記錄了呼叫者執行緒)
q = new WaitNode();
// 2. 未進入佇列
else if (!queued)
// 嘗試入隊
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 超時處理
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
// 3. 阻塞當前執行緒(呼叫者執行緒)
else
// 【本文由公從號“彤哥讀原始碼”原創】
LockSupport.park(this);
}
}
這裡我們假設呼叫get()時任務還未執行,也就是其狀態為NEW,我們試著按上面標示的1、2、3、4走一遍邏輯:
(1)第一次迴圈,狀態為NEW,直接到1處,初始化佇列並把呼叫者執行緒封裝在WaitNode中;
(2)第二次迴圈,狀態為NEW,佇列不為空,到2處,讓包含呼叫者執行緒的WaitNode入隊;
(3)第三次迴圈,狀態為NEW,佇列不為空,且已入隊,到3處,阻塞呼叫者執行緒;
(4)假設過了一會任務執行完畢了,根據run()方法的分析最後會unpark呼叫者執行緒,也就是3處會被喚醒;
(5)第四次迴圈,狀態肯定大於COMPLETING了,退出迴圈並返回;
問題:為什麼要在for迴圈中控制整個流程呢,把這裡的每一步單獨拿出來寫行不行?
答:因為每一次動作都需要重新檢查狀態state有沒有變化,如果拿出去寫也是可以的,只是程式碼會非常冗長。這裡只分析了get()時狀態為NEW,其它的狀態也可以自行驗證,都是可以保證正確的,甚至兩個執行緒交叉執行(斷點的技巧)。
OK,這裡返回之後,再看看是怎麼處理最終的結果的。
private V report(int s) throws ExecutionException {
Object x = outcome;
// 任務正常結束
if (s == NORMAL)
return (V)x;
// 被取消了
if (s >= CANCELLED)
throw new CancellationException();
// 執行異常
throw new ExecutionException((Throwable)x);
}
還記得前面分析run的時候嗎,任務執行異常時是把異常放在outcome裡面的,這裡就用到了。
(1)如果正常執行結束,則返回任務的返回值;
(2)如果異常結束,則包裝成ExecutionException異常丟擲;
通過這種方式,執行緒中出現的異常也可以返回給呼叫者執行緒了,不會像執行普通任務那樣呼叫者是不知道任務執行到底有沒有成功的。
其它
FutureTask除了可以獲取任務的返回值以外,還能夠取消任務的執行。
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
這裡取消任務是通過中斷執行執行緒來處理的,有興趣的同學可以自己分析一下。
回答開篇
如果這裡把future.get()放到for迴圈裡面,時間大概是多少?
答:大概會是5秒多一點,因為每提交一個任務,都要阻塞呼叫者執行緒直到任務執行完畢,每個任務執行都是1秒多,所以總時間就是5秒多點。
總結
(1)未來任務是通過把普通任務包裝成FutureTask來實現的。
(2)通過FutureTask不僅能夠獲取任務執行的結果,還有感知到任務執行的異常,甚至還可以取消任務;
(3)AbstractExecutorService中定義了很多模板方法,這是一種很重要的設計模式;
(4)FutureTask其實就是典型的異常呼叫的實現方式,後面我們學習到Netty、Dubbo的時候還會見到這種設計思想的。
彩蛋
RPC框架中非同步呼叫是怎麼實現的?
答:RPC框架常用的呼叫方式有同步呼叫、非同步呼叫,其實它們本質上都是非同步呼叫,它們就是用FutureTask的方式來實現的。
一般地,通過一個執行緒(我們叫作遠端執行緒)去呼叫遠端介面,如果是同步呼叫,則直接讓呼叫者執行緒阻塞著等待遠端執行緒呼叫的結果,待結果返回了再返回;如果是非同步呼叫,則先返回一個未來可以獲取到遠端結果的東西FutureXxx,當然,如果這個FutureXxx在遠端結果返回之前呼叫了get()方法一樣會阻塞著呼叫者執行緒。
有興趣的同學可以先去預習一下dubbo的非同步呼叫(它是把Future扔到RpcContext中的)。
歡迎關注我的公眾號“彤哥讀原始碼”,檢視更多原始碼系列文章, 與彤哥一起暢遊原始碼的海洋。
相關推薦
死磕 java執行緒系列之執行緒池深入解析——未來任務執行流程
(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 注:執行緒池原始碼部分如無特殊說明均指ThreadPoolExecutor類。 簡介 前面我們一起學習了執行緒池中普通任務的執行流程,但其實執行緒池中還有一種任務,叫作未來任務(future task),使用它
死磕 java執行緒系列之執行緒池深入解析——普通任務執行流程
(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 注:執行緒池原始碼部分如無特殊說明均指ThreadPoolExecutor類。 簡介 前面我們一起學習了Java中執行緒池的體系結構、構造方法和生命週期,本章我們一起來學習執行緒池中普通任務到底是怎麼執行的。
死磕 java執行緒系列之執行緒池深入解析——定時任務執行流程
(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 注:本文基於ScheduledThreadPoolExecutor定時執行緒池類。 簡介 前面我們一起學習了普通任務、未來任務的執行流程,今天我們再來學習一種新的任務——定時任務。 定時任務是我們經常會用到的一
死磕java concurrent包系列(六)基於AQS解析訊號量Semaphore
Semaphore 之前分析AQS的時候,內部有兩種模式,獨佔模式和共享模式,前面的ReentrantLock都是使用獨佔模式,而Semaphore同樣作為一個基於AQS實現的併發元件,它是基於共享模式實現的,我們先看看它的使用場景 Semaphore共享鎖的基本使用 假設有20個人去銀行櫃面辦理業務,
死磕java concurrent包系列(一)從樂觀鎖、悲觀鎖到AtomicInteger的CAS演算法
前言 Java中有各式各樣的鎖,主流的鎖和概念如下: 這篇文章主要是為了讓大家通過樂觀鎖和悲觀鎖出發,理解CAS演算法,因為CAS是整個Concurrent包的基礎。 樂觀鎖和悲觀鎖 首先,java和資料庫中都有這種概念,他是一種從執行緒同步的角度上看的一種廣義上的概念: 悲觀鎖:悲觀的認為自己在使用資料的
死磕java concurrent包系列(三)基於ReentrantLock理解AQS的條件佇列
基於Codition分析AQS的條件佇列 前言 上一篇我們講了AQS中的同步佇列佇列,現在我們研究一下條件佇列。 在java中最常見的加鎖方式就是synchorinzed和Reentrantlock,我們都說Reentrantlock比synchorinzed更加靈活,其實就靈活在Reentrantlock中
死磕java concurrent包系列(五)基於AQS的條件佇列把LinkedBlockingQueue“扒光”
LinkedBlockingQueue的基礎 LinkedBlockingQueue是一個基於連結串列的阻塞佇列,實際使用上與ArrayBlockingQueue完全一樣,我們只需要把之前烤雞的例子中的Queue物件替換一下即可。如果對於ArrayBlockingQueue不熟悉,可以去看看https://
死磕 java執行緒系列之執行緒模型
(2)執行緒模型有哪些? (3)各語言使用的是哪種執行緒模型? 簡介 在Java中,我們平時所說的併發程式設計、多執行緒、共享資源等概念都是與執行緒相關的,這裡所說的執行緒實際上應該叫作“使用者執行緒”,而對應到作業系統,還有另外一種執行緒叫作“核心執行緒”。 使用者執行緒位於核心之上,它的管理無需核心支援
死磕 java執行緒系列之建立執行緒的8種方式
問題 (1)建立執行緒有哪幾種方式? (2)它們分別有什麼運用場景? 簡介 建立執行緒,是多執行緒程式設計中最基本的操作,彤哥總結了一下,大概有8種建立執行緒的方式,你知道嗎? 繼承Thread類並重寫run()方法 public class CreatingThread01 extends Thread
死磕 java執行緒系列之自己動手寫一個執行緒池
歡迎關注我的公眾號“彤哥讀原始碼”,檢視更多原始碼系列文章, 與彤哥一起暢遊原始碼的海洋。 (手機橫屏看原始碼更方便) 問題 (1)自己動手寫一個執行緒池需要考慮哪些因素? (2)自己動手寫的執行緒池如何測試? 簡介 執行緒池是Java併發程式設計中經常使用到的技術,那麼自己如何動手寫一個執行緒池呢?本
死磕 java執行緒系列之自己動手寫一個執行緒池(續)
(手機橫屏看原始碼更方便) 問題 (1)自己動手寫的執行緒池如何支援帶返回值的任務呢? (2)如果任務執行的過程中丟擲異常了該
死磕 java執行緒系列之執行緒池深入解析——體系結構
(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 簡介 Java的執行緒池是塊硬骨頭,對執行緒池的原始碼做深入研究不僅能提高對Java整個併發程式設計的理解,也能提高自己在面試中的表現,增加被錄取的可能性。 本系列將分成很多個章節,本章作為執行緒池的第一章將對
死磕 java執行緒系列之執行緒池深入解析——生命週期
(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 注:執行緒池原始碼部分如無特殊說明均指ThreadPoolExecutor類。 簡介 上一章我們一起重溫了下執行緒的生命週期(六種狀態還記得不?),但是你知不知道其實執行緒池也是有生命週期的呢?! 問題 (1)
死磕 java執行緒系列之終篇
(手機橫屏看原始碼更方便) 簡介 執行緒系列我們基本就學完了,這一個系列我們基本都是圍繞著執行緒池在講,其實關於執行緒還有很多東西可以講,後面有機會我們再補充進來。當然,如果你有什麼好的想法,也可以公從號右下角聯絡我。 重要知識點 直接上圖,看著這張圖我相信你能夠回憶起很多東西,也可以看著這張圖來自己提
【死磕Java併發】-----J.U.C之AQS:阻塞和喚醒執行緒
此篇部落格所有原始碼均來自JDK 1.8 線上程獲取同步狀態時如果獲取失敗,則加入CLH同步佇列,通過通過自旋的方式不斷獲取同步狀態,但是在自旋的過程中則需要判斷當前執行緒是否需要阻塞,其主要方法在acquireQueued(): if (sho
【紮實基本功】Java基礎教程系列之多執行緒
1. 多執行緒的概念 1.1 程序、執行緒、多程序的概念 程序:正在進行中的程式(直譯)。 執行緒是程式執行的一條路徑, 一個程序中可以包含多條執行緒。 一個應用程式可以理解成就是一個程序。 多執行緒併發執行可以提高程式的效率, 可以同時完成多項工作。 1.
Java 併發程式設計系列之帶你瞭解多執行緒
早期的計算機不包含作業系統,它們從頭到尾執行一個程式,這個程式可以訪問計算機中的所有資源。在這種情況下,每次都只能執行一個程式,對於昂貴的計算機資源來說是一種嚴重的浪費。 作業系統出現後,計算機可以執行多個程式,不同的程式在單獨的程序中執行。作業系統負責為各個獨
java多執行緒系列之模式|第一篇-Guarded Suspension pattern
Guarded Suspension pattern模式 作者注:該系列文章基於《java執行緒設計模式》撰寫,只用於學習和交流。 定義:多執行緒執行,當前執行緒沒有達到警戒條件時,執行緒會進入等待直到
java多執行緒系列之模式|第三篇: Producer-Consumer pattern
生產者-消費者模式 含義:顧名思義,生產者用來生產資料,可能有一到多個,消費者用來消費資料,也可能有多個,中間會有一個“橋樑參與者”,作為資料的存放以及執行緒之間的同步和協調。 範例程式行為: 廚師(MakerThread)做蛋糕,做好後放在桌子(Table)上 桌子
多執行緒系列之 java多執行緒的個人理解(二)
前言:上一篇多執行緒系列之 java多執行緒的個人理解(一) 講到了執行緒、程序、多執行緒的基本概念,以及多執行緒在java中的基本實現方式,本篇主要接著上一篇繼續講述多執行緒在實際專案中的應用以及遇到的諸多問題和解決方案