1. 程式人生 > >阿里面試P6以上必問:併發程式設計

阿里面試P6以上必問:併發程式設計

Java併發程式設計在實際的工作中應用廣泛,有時候需要通過多執行緒去非同步做一些事情,有時候需要通過多執行緒提升一個任務執行的效率。網際網路公司面試最常問到的點。本文有點長,程式碼比較多,請耐心看完,提升是需要一個學習的過程。

關鍵概念

上下文切換

  1. 概念:CPU通過時間片演算法,給可執行的執行緒分配執行時間,在不同執行緒之間的切換時需要將當前執行緒的狀態儲存並回復將要執行的執行緒狀態資訊,這個過程就是上下文切換。

  2. 如何減少或避免上下文切換?

  • 無鎖併發程式設計

  • CAS演算法

  • 使用最少執行緒

  • 協程

死鎖

  1. 概念:兩個或多個執行緒持有對方正在等待的鎖

  2. 如何避免死鎖?

  • 避免一個執行緒同時獲取多個鎖

  • 避免一個執行緒在鎖內同時佔用多個資源,儘量保證每個鎖只佔用一個資源

  • 嘗試使用定時鎖

  • 對於資料庫鎖,加鎖和解鎖必須在一個數據庫連線裡

Java併發的底層機制

volatile

  1. 作用:在多處理器開發中保證多執行緒之間的共享變數的可見性,即一個執行緒修改該變數的值時,其他的執行緒可以立即看到該變數最新的值。

  2. 原理:對被volatile修飾的變數進行寫操作時,會做如下兩個事情

  • 將當前處理器快取行的資料寫到系統記憶體;

  • 使得其他CPU裡快取了該記憶體地址的資料無效

  1. 使用要點:

  • volatile只能保證可見性,無法保證同步性。舉個例子:如果針對某個變數的改變後的值依賴於上次改變的值,使用volatile就無法保證併發安全了

synchronized

  1. 定義:synchronized是Java多執行緒之間的一種通訊方式。synchronized的具體應用有三種:

  • 對於普通同步方法,鎖是當前例項物件

  • 對於靜態同步方法,鎖是當前類的Class物件

  • 對於同步程式碼塊,鎖是synchronized括號裡配置的物件

  1. 使用要點:

  • 構造方法不能用synchronized修飾

  • 推薦儘量減小鎖的粒度,例如,使用同步程式碼塊可以滿足需求就不需要使用同步方法

  • 如果可以確認應用中的所有鎖在大多數情況下都由不同的執行緒競爭,可以通過-XX:+UseBiasedLocking禁用偏向鎖,提升效能。

  1. 原理:介紹兩個概念,Monitor Record(Thread類的私有資料結構)和Java物件頭,關係是:Java物件頭中儲存了Monitor Record的地址,Monitor Record中記錄了持有它的執行緒。

  • monitor:monitor不是一個特殊的物件,是一種方法或機制,Java通過monitor來控制對某個物件的訪問。Java中的每個物件都和一個monitor相關聯。在同一個時刻,只有一個執行緒(Thread)可以鎖定一個monitor。當某個monitor被一個執行緒鎖定時,其他試圖鎖定這個monitor的執行緒只能block等待。

  • 物件頭:synchronized的鎖狀態描述在Java物件的頭部。物件頭中包括Mark word和Klass Word。

  1. 在32位虛擬機器中,整個物件頭大小是64bits(即8位元組),Mark Word和Klass Word分別佔用4位元組。

  2. 鎖狀態:Java中的鎖按照級別從低到高有四種,無鎖狀態——>偏向鎖——>輕量級鎖——>重量級鎖。偏向鎖是依賴Mark Word中的一個指向當前執行緒的欄位來標識該鎖的持有者是否是當前執行緒,如果是則直接進入同步程式碼塊;假設禁用了偏向鎖,輕量級鎖指的是兩個執行緒獲取鎖,一個獲取到,另一個獲取不成功的狀態,首先會CAS自旋獲取鎖,如果CAS自旋獲取失敗,該輕量級鎖就會膨脹為重量級鎖,當前獲取鎖失敗的執行緒進入阻塞狀態。

  3. 鎖升級的過程,只會從低到高,不會從高到低,避免不必要的資源浪費。舉個例子,如果一個鎖的狀態已經達到重量級鎖,後面再來競爭這個鎖的執行緒都會直接進入阻塞,不會再進行CAS自旋。參考資料7中提供的一張圖很精緻,我放在這裡:

阿里面試P4以上必問:併發程式設計

Object Header(32位虛擬機器)

阿里面試P4以上必問:併發程式設計

原子操作

CPU級別的原子操作

在CPU級別實現原子操作需要依靠CPU指令完成,CPU指令通過匯流排操作記憶體中的資料,因此在CPU中有兩個方式:

  1. 鎖匯流排:利用LOCK指令向匯流排發出訊號,實現一個

  2. 鎖快取:在某個一時刻,只需要保證對某個記憶體地址的操作是原子性的;

JAVA中的原子操作

在Java中可以通過CAS和鎖來實現原子操作。

  1. 使用CAS實現原子操作,從Java1.5開始,java.lang.concurrent包裡提供了很多類來支援原子操作,例如AotmicIntenger、AtomicLong,這些類可以以原子的方式將變數的當前值加1或減1;

  2. 使用鎖實現原子操作,鎖機制確保只有持有鎖的執行緒才能操作指定的變數;

都說到這裡了,面試必不可少問的---併發程式設計之執行緒池的使用及擴充套件和優化

現在可以說是程式碼時間了

簡而言之,在使用執行緒池後,建立執行緒便處理從執行緒池獲得空閒執行緒,關閉執行緒變成了向池子歸還執行緒。也就是說,提高了執行緒的複用。

而 JDK 在 1.5 之後為我提供了現成的執行緒池工具,我們今天就來學習看看如何使用他們。

  1. Executors 執行緒池工廠能建立哪些執行緒池

  2. 如何手動建立執行緒池

  3. 如何擴充套件執行緒池

  4. 如何優化執行緒池的異常資訊

  5. 如何設計執行緒池中的執行緒數量

1. Executors 執行緒池工廠能建立哪些執行緒池

先來一個最簡單的執行緒池使用例子:

staticclass MyTask implements Runnable {

@Override

public void run() {

System.out

.println(System.currentTimeMillis() + ": Thread ID :" + Thread.currentThread().getId());

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

public static void main(String[] args) {

MyTask myTask = new MyTask();

ExecutorService service1 = Executors.newFixedThreadPool(5);

for (int i = 0; i < 10; i++) {

service1.submit(myTask);

}

service1.shutdown();

}

執行結果:

阿里面試P4以上必問:併發程式設計

我們建立了一個執行緒池例項,並設定預設執行緒數量為5,並向執行緒池提交了10任務,分別列印當前毫秒時間和執行緒ID,從結果中,我們可以看到結果中有5個相同 id 的執行緒列印了毫秒時間。

這是最簡單的例子。

接下來我們講講其他的執行緒建立方式。

1. 固定執行緒池ExecutorService service1 = Executors.newFixedThreadPool(5);該方法返回一個固定執行緒數量的執行緒池。該執行緒池中的執行緒數量始終不變。當有一個新的任務提交時,執行緒池中若有空閒執行緒,則立即執行,若沒有,則新的任務會被暫存在一個任務佇列(預設無界佇列 int 最大數)中,待有執行緒空閒時,便處理在任務佇列中的任務。

2. 單例執行緒池ExecutorService service3 = Executors.newSingleThreadExecutor();該方法返回一個只有一個執行緒的執行緒池。若多餘一個任務被提交到該執行緒池,任務會被儲存在一個任務佇列(預設無界佇列 int 最大數)中,待執行緒空閒,按先入先出的順序執行佇列中的任務。

3. 快取執行緒池ExecutorService service2 = Executors.newCachedThreadPool();該方法返回一個可根據實際情況調整執行緒數量的執行緒池,執行緒池的執行緒數量不確定,但若有空閒執行緒可以複用,則會優先使用可複用的執行緒,所有執行緒均在工作,如果有新的任務提交,則會建立新的執行緒處理任務。所有執行緒在當前任務執行完畢後,將返回執行緒池進行復用。

4. 任務呼叫執行緒池ExecutorService service4 = Executors.newScheduledThreadPool(2);該方法也返回一個 ScheduledThreadPoolExecutor 物件,該執行緒池可以指定執行緒數量。

前3個執行緒的用法沒什麼差異,關鍵是第四個,雖然執行緒任務排程框架很多,但是我們仍然可以學習該執行緒池。如何使用呢?下面來個例子:

class A {

public static void main(String[] args) {

ScheduledThreadPoolExecutor service4 = (ScheduledThreadPoolExecutor) Executors

.newScheduledThreadPool(2);

// 如果前面的任務沒有完成,則排程也不會啟動

service4.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

// 如果任務執行時間大於間隔時間,那麼就以執行時間為準(防止任務出現堆疊)。

Thread.sleep(10000);

System.out.println(System.currentTimeMillis() / 1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}// initialDelay(初始延遲) 表示第一次延時時間 ; period 表示間隔時間

}, 0, 2, TimeUnit.SECONDS);

service4.scheduleWithFixedDelay(new Runnable() {

@Override

public void run() {

try {

Thread.sleep(5000);

System.out.println(System.currentTimeMillis() / 1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}// initialDelay(初始延遲) 表示延時時間;delay + 任務執行時間 = 等於間隔時間 period

}, 0, 2, TimeUnit.SECONDS);

// 在給定時間,對任務進行一次排程

service4.schedule(new Runnable() {

@Override

public void run() {

System.out.println("5 秒之後執行 schedule");

}

}, 5, TimeUnit.SECONDS);

}

}

}

上面的程式碼建立了一個 ScheduledThreadPoolExecutor 任務排程執行緒池,分別呼叫了3個方法,需要著重解釋 scheduleAtFixedRate 和 scheduleWithFixedDelay 方法,這兩個方法的作用很相似,唯一的區別就是他們執行人物的間隔時間的計算方式,前者時間間隔演算法是根據指定的 period 時間和任務執行時間中取時間長的,後者取的是指定的 delay 時間 + 任務執行時間。如果同學們有興趣,可以將上面的程式碼跑跑看。一樣便能看出端倪。

好了,JDK 給我們封裝了建立執行緒池的 4 個方法,但是,請注意,由於這些方法高度封裝,因此,如果使用不當,出了問題將無從排查,因此,我建議,程式設計師應到自己手動建立執行緒池,而手動建立的前提就是高度瞭解執行緒池的引數設定。那麼我們就來看看如何手動建立執行緒池。

2. 如何手動建立執行緒池

下面是一個手動建立執行緒池的範本:

/**

* 預設5條執行緒(預設數量,即最少數量),

* 最大20執行緒(指定了執行緒池中的最大執行緒數量),

* 空閒時間0秒(當執行緒池梳理超過核心數量時,多餘的空閒時間的存活時間,即超過核心執行緒數量的空閒執行緒,在多長時間內,會被銷燬),

* 等待佇列長度1024,

* 執行緒名稱[MXR-Task-%d],方便回溯,

* 拒絕策略:當任務佇列已滿,丟擲RejectedExecutionException

* 異常。

*/

private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 20, 0L,

TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024)

, new ThreadFactoryBuilder().setNameFormat("My-Task-%d").build()

, new AbortPolicy()

);

我們看到,ThreadPoolExecutor 也就是執行緒池有 7 個引數,我們一起來好好看看:

  1. corePoolSize 執行緒池中核心執行緒數量

  2. maximumPoolSize 最大執行緒數量

  3. keepAliveTime 空閒時間(當執行緒池梳理超過核心數量時,多餘的空閒時間的存活時間,即超過核心執行緒數量的空閒執行緒,在多長時間內,會被銷燬)

  4. unit 時間單位

  5. workQueue 當核心執行緒工作已滿,需要儲存任務的佇列

  6. threadFactory 建立執行緒的工廠

  7. handler 當佇列滿了之後的拒絕策略

前面幾個引數我們就不講了,很簡單,主要是後面幾個引數,佇列,執行緒工廠,拒絕策略。

我們先看看佇列,執行緒池預設提供了 4 個佇列。

  1. 無界佇列: 預設大小 int 最大值,因此可能會耗盡系統記憶體,引起OOM,非常危險。

  2. 直接提交的佇列 : 沒有容量,不會儲存,直接建立新的執行緒,因此需要設定很大的執行緒池數。否則容易執行拒絕策略,也很危險。

  3. 有界佇列:如果core滿了,則儲存在佇列中,如果core滿了且佇列滿了,則建立執行緒,直到maximumPoolSize 到了,如果佇列滿了且最大執行緒數已經到了,則執行拒絕策略。

  4. 優先順序佇列:按照優先順序執行任務。也可以設定大小。

樓主在自己的專案中使用了無界佇列,但是設定了任務大小,1024。如果你的任務很多,建議分為多個執行緒池。不要把雞蛋放在一個籃子裡。

再看看拒絕策略,什麼是拒絕策略呢?當佇列滿了,如何處理那些仍然提交的任務。JDK 預設有4種策略。

  1. AbortPolicy :直接丟擲異常,阻止系統正常工作.

  2. CallerRunsPolicy : 只要執行緒池未關閉,該策略直接在呼叫者執行緒中,運行當前被丟棄的任務。顯然這樣做不會真的丟棄任務,但是,任務提交執行緒的效能極有可能會急劇下降。

  3. DiscardOldestPolicy: 該策略將丟棄最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務.

  4. DiscardPolicy: 該策略默默地丟棄無法處理的任務,不予任何處理,如果允許任務丟失,我覺得這是最好的方案.

當然,如果你不滿意JDK提供的拒絕策略,可以自己實現,只需要實現 RejectedExecutionHandler 介面,並重寫 rejectedExecution 方法即可。

最後,執行緒工廠,執行緒池的所有執行緒都由執行緒工廠來建立,而預設的執行緒工廠太過單一,我們看看預設的執行緒工廠是如何建立執行緒的:

/**

* The default thread factory

*/

static class DefaultThreadFactory implements ThreadFactory {

private static final AtomicInteger poolNumber = new AtomicInteger(1);

private final ThreadGroup group;

private final AtomicInteger threadNumber = new AtomicInteger(1);

private final String namePrefix;

DefaultThreadFactory() {

SecurityManager s = System.getSecurityManager();

group = (s != null) ? s.getThreadGroup() :

Thread.currentThread().getThreadGroup();

namePrefix = "pool-" +

poolNumber.getAndIncrement() +

"-thread-";

}

public Thread newThread(Runnable r) {

Thread t = new Thread(group, r,

namePrefix + threadNumber.getAndIncrement(),

0);

if (t.isDaemon())

t.setDaemon(false);

if (t.getPriority() != Thread.NORM_PRIORITY)

t.setPriority(Thread.NORM_PRIORITY);

return t;

}

}

可以看到,執行緒名稱為 pool- + 執行緒池編號 + -thread- + 執行緒編號 。設定為非守護執行緒。優先順序為預設。

如果我們想修改名稱呢?對,實現 ThreadFactory 介面,重寫 newThread 方法即可。但是已經有人造好輪子了, 比如我們的例子中使用的 google 的 guaua 提供的 ThreadFactoryBuilder 工廠。可以自定義執行緒名稱,是否守護,優先順序,異常處理等等,功能強大。

3. 如何擴充套件執行緒池

那麼我們能擴充套件執行緒池的功能嗎?比如記錄執行緒任務的執行時間。實際上,JDK 的執行緒池已經為我們預留的介面,線上程池核心方法中,有2 個方法是空的,就是給我們預留的。還有一個執行緒池退出時會呼叫的方法。我們看看例子:

/**

* 如何擴充套件執行緒池,重寫 beforeExecute, afterExecute, terminated 方法,這三個方法預設是空的。

*

* 可以監控每個執行緒任務執行的開始和結束時間,或者自定義一些增強。

*

* 在 Worker 的 runWork 方法中,會呼叫這些方法

*/

public class ExtendThreadPoolDemo {

static class MyTask implements Runnable {

String name;

public MyTask(String name) {

this.name = name;

}

@Override

public void run() {

System.out

.println("正在執行:Thread ID:" + Thread.currentThread().getId() + ", Task Name = " + name);

try {

Thread.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

public static void main(String[] args) throws InterruptedException {

ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<>()) {

@Override

protected void beforeExecute(Thread t, Runnable r) {

System.out.println("準備執行:" + ((MyTask) r).name);

}

@Override

protected void afterExecute(Runnable r, Throwable t) {

System.out.println("執行完成: " + ((MyTask) r).name);

}

@Override

protected void terminated() {

System.out.println("執行緒池退出");

}

};

for (int i = 0; i < 5; i++) {

MyTask myTask = new MyTask("TASK-GEYM-" + i);

es.execute(myTask);

Thread.sleep(10);

}

es.shutdown();

}

}

我們重寫了 beforeExecute 方法,也就是執行任務之前會呼叫該方法,而 afterExecute 方法則是在任務執行完畢後會呼叫該方法。還有一個 terminated 方法,線上程池退出時會呼叫該方法。執行結果是什麼呢?

阿里面試P4以上必問:併發程式設計

可以看到,每個任務執行前後都會呼叫 before 和 after 方法。相當於執行了一個切面。而在呼叫 shutdown 方法後則會呼叫 terminated 方法。

4. 如何優化執行緒池的異常資訊

如何優化執行緒池的異常資訊? 在說這個問題之前,我們先說一個不容易發現的bug:

看程式碼:

public static void main(String[] args) throws ExecutionException, InterruptedException {

ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L,

TimeUnit.MILLISECONDS, new SynchronousQueue<>());

for (int i = 0; i < 5; i++) {

executor.submit(new DivTask(100, i));

}

}

static class DivTask implements Runnable {

int a, b;

public DivTask(int a, int b) {

this.a = a;

this.b = b;

}

@Override

public void run() {

double re = a / b;

System.out.println(re);

}

}

執行結果:

阿里面試P4以上必問:併發程式設計

注意:只有4個結果,其中一個結果被吞沒了,並且沒有任何資訊。為什麼呢?如果仔細看程式碼,會發現,在進行 100 / 0 的時候肯定會報錯的,但是卻沒有報錯資訊,令人頭痛,為什麼呢?實際上,如果你使用 execute 方法則會列印錯誤資訊,當你使用 submit 方法卻沒有呼叫它的get 方法,異常將會被吞沒,因為,如果發生了異常,異常是作為返回值返回的。

怎麼辦呢?我們當然可以使用 execute 方法,但是我們可以有另一種方式:重寫 submit 方法,樓主寫了一個例子,大家看一下:

static class TraceThreadPoolExecutor extends ThreadPoolExecutor {

public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,

TimeUnit unit, BlockingQueue<Runnable> workQueue) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

}

@Override

public void execute(Runnable command) {

// super.execute(command);

super.execute(wrap(command, clientTrace(), Thread.currentThread().getName()));

}

@Override

public Future<?> submit(Runnable task) {

// return super.submit(task);

return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));

}

private Exception clientTrace() {

return new Exception("Client stack trace");

}

private Runnable wrap(final Runnable task, final Exception clientStack,

String clientThreaName) {

return new Runnable() {

@Override

public void run() {

try {

task.run();

} catch (Exception e) {

e.printStackTrace();

clientStack.printStackTrace();

throw e;

}

}

};

}

}

我們重寫了 submit 方法,封裝了異常資訊,如果發生了異常,將會列印堆疊資訊。我們看看使用重寫後的執行緒池後的結果是什麼?

阿里面試P4以上必問:併發程式設計

從結果中,我們清楚的看到了錯誤資訊的原因:by zero!並且堆疊資訊明確,方便排錯。優化了預設執行緒池的策略。

5. 如何設計執行緒池中的執行緒數量

執行緒池的大小對系統的效能有一定的影響,過大或者過小的執行緒數量都無法發揮最優的系統性能,但是執行緒池大小的確定也不需要做的非常精確。因為只要避免極大和極小兩種情況,執行緒池的大小對效能的影響都不會影響太大,一般來說,確定執行緒池的大小需要考慮CPU數量,記憶體大小等因素,在《Java Concurrency in Practice》 書中給出了一個估算執行緒池大小的經驗公式:

阿里面試P4以上必問:併發程式設計

公式還是有點複雜的,簡單來說,就是如果你是CPU密集型運算,那麼執行緒數量和CPU核心數相同就好,避免了大量無用的切換執行緒上下文,如果你是IO密集型的話,需要大量等待,那麼執行緒數可以設定的多一些,比如CPU核心乘以2.

至於如何獲取 CPU 核心數,Java 提供了一個方法:

Runtime.getRuntime().availableProcessors();

返回了CPU的核心數量。

總結

好了,到這裡,我們已經對高併發,如何使用執行緒池有了一個認識,這裡,樓主建議大家手動建立執行緒池,這樣對執行緒池中的各個引數可以有精準的瞭解,在對系統進行排錯或者調優的時候有好處。比如設定核心執行緒數多少合適,最大執行緒數,拒絕策略,執行緒工廠,佇列的大小和型別等等,也可以是G家的執行緒工廠自定義執行緒。

最後分享一個學習路線,也可以加群:433540541獲取相關資料。

阿里面試P4以上必問:併發程式設計

ps:更多架構好文關注架構師客棧,架構資料在公眾號中回覆獲取。不能說每天一篇架構好文,但是有時間肯定會更新哦。