【本人禿頂程式設計師】Java併發:Semaphore訊號量原始碼分析
←←←←←←←←←←←← 快,點關注!
JUC 中 Semaphore 的使用與原理分析,Semaphore 也是 Java 中的一個同步器,與 CountDownLatch 和 CycleBarrier 不同在於它內部的計數器是遞增的,那麼,Semaphore 的內部實現是怎樣的呢?
Semaphore 訊號量也是Java 中一個同步容器,與CountDownLatch 和 CyclicBarrier 不同之處在於它內部的計數器是遞增的。為了能夠一覽Semaphore的內部結構,我們首先要看一下Semaphore的類圖,類圖,如下所示:
如上類圖可以知道Semaphoren內部還是使用AQS來實現的,Sync只是對AQS的一個修飾,並且Sync有兩個實現類,分別代表獲取訊號量的時候是否採取公平策略。建立Semaphore的時候會有一個變數標示是否使用公平策略,原始碼如下:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new
NonfairSync(permits);
}
Sync(int permits) {
setState(permits);
}
如上面程式碼所示,Semaphore預設使用的是非公平策略,如果你需要公平策略,則可以使用帶兩個引數的建構函式來構造Semaphore物件,另外和CountDownLatch一樣,建構函式裡面傳遞的初始化訊號量個數 permits 被賦值給了AQS 的state狀態變數,也就是說這裡AQS的state值表示當前持有的訊號量個數。
接下來我們主要看看Semaphore實現的主要方法的原始碼,如下:
1、void acquire() 當前執行緒呼叫該方法的時候,目的是希望獲取一個訊號量資源,如果當前訊號量計數個數大於 0 ,並且當前執行緒獲取到了一個訊號量則該方法直接返回,當前訊號量的計數會減少 1 。否則會被放入AQS的阻塞佇列,當前執行緒被掛起,直到其他執行緒呼叫了release方法釋放了訊號量,並且當前執行緒通過競爭獲取到了改訊號量。當前執行緒被其他執行緒呼叫了 interrupte()方法中斷後,當前執行緒會丟擲 InterruptedException異常返回。原始碼如下:
public void acquire() throws InterruptedException { //傳遞引數為1,說明要獲取1個訊號量資源 sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //(1)如果執行緒被中斷,則丟擲中斷異常 if (Thread.interrupted()) throw new InterruptedException(); //(2)否者呼叫sync子類方法嘗試獲取,這裡根據建構函式確定使用公平策略 if (tryAcquireShared(arg) < 0) //如果獲取失敗則放入阻塞佇列,然後再次嘗試如果失敗則呼叫park方法掛起當前執行緒 doAcquireSharedInterruptibly(arg); }
如上程式碼可知,acquire()內部呼叫了sync的acquireSharedInterruptibly 方法,後者是對中斷響應的(如果當前執行緒被中斷,則丟擲中斷異常),嘗試獲取訊號量資源的AQS的方法tryAcquireShared 是由 sync 的子類實現,所以這裡就要分公平性了,這裡先討論非公平策略 NonfairSync 類的 tryAcquireShared 方法,原始碼如下:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//獲取當前訊號量值
int available = getState();
//計算當前剩餘值
int remaining = available - acquires;
//如果當前剩餘小於0或者CAS設定成功則返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
如上程式碼,先計算當前訊號量值(available)減去需要獲取的值(acquires) 得到剩餘的訊號量個數(remaining),如果剩餘值小於 0 說明當前訊號量個數滿足不了需求,則直接返回負數,然後當前執行緒會被放入AQS的阻塞佇列,當前執行緒被掛起。如果剩餘值大於 0 則使用CAS操作設定當前訊號量值為剩餘值,然後返回剩餘值。另外可以知道NonFairSync是非公平性獲取的,是說先呼叫aquire方法獲取訊號量的執行緒不一定比後來者先獲取鎖。
接下來我們要看看公平性的FairSync 類是如何保證公平性的,原始碼如下:
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
可以知道公平性還是靠 hasQueuedPredecessors 這個方法來做的,以前的隨筆已經講過公平性是看當前執行緒節點是否有前驅節點也在等待獲取該資源,如果是則自己放棄獲取的權力,然後當前執行緒會被放入AQS阻塞佇列,否則就去獲取。hasQueuedPredecessors原始碼如下:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
如上面程式碼所示,如果當前執行緒節點有前驅節點則返回true,否則如果當前AQS佇列為空 或者 當前執行緒節點是AQS的第一個節點則返回 false ,其中,如果 h == t 則說明當前佇列為空則直接返回 false,如果 h !=t 並且 s == null 說明有一個元素將要作為AQS的第一個節點入佇列(回顧下 enq 函式第一個元素入佇列是兩步操作,首先建立一個哨兵頭節點,然後第一個元素插入到哨兵節點後面),那麼返回 true,如果 h !=t 並且 s != null 並且 s.thread != Thread.currentThread() 則說明佇列裡面的第一個元素不是當前執行緒則返回 true。
2、void acquire(int permits) 該方法與 acquire() 不同在與後者只需要獲取一個訊號量值,而前者則獲取指定 permits 個,原始碼如下:
public void acquire(int permits) throws InterruptedException {
if (permits < 0)
throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
3、void acquireUninterruptibly() 該方法與 acquire() 類似,不同之處在於該方法對中斷不響應,也就是噹噹前執行緒呼叫了 acquireUninterruptibly 獲取資源過程中(包含被阻塞後)其它執行緒呼叫了當前執行緒的 interrupt()方法設定了當前執行緒的中斷標誌當前執行緒並不會丟擲 InterruptedException 異常而返回。原始碼如下:
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
4、void acquireUninterruptibly(int permits) 該方法與 acquire(int permits) 不同在於該方法對中斷不響應。原始碼如如下:
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
5、void release() 該方法作用是把當前 semaphore物件的訊號量值增加 1 ,如果當前有執行緒因為呼叫 acquire 方法被阻塞放入了 AQS的阻塞佇列,則會根據公平策略選擇一個執行緒進行啟用,啟用的執行緒會嘗試獲取剛增加的訊號量,原始碼如下:
public void release() {
//(1)arg=1
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//(2)嘗試釋放資源
if (tryReleaseShared(arg)) {
//(3)資源釋放成功則呼叫park喚醒AQS佇列裡面最先掛起的執行緒 doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//(4)獲取當前訊號量值
int current = getState();
//(5)當前訊號量值增加releases,這裡為增加1
int next = current + releases;
if (next < current) // 移除處理
throw new Error("Maximum permit count exceeded");
//(6)使用cas保證更新訊號量值的原子性
if (compareAndSetState(current, next))
return true;
}
}
如上面程式碼可以看到 release()方法中對 sync.releaseShared(1),可以知道release方法每次只會對訊號量值增加 1 ,tryReleaseShared方法是無限迴圈,使用CAS保證了 release 方法對訊號量遞增 1 的原子性操作。當tryReleaseShared 方法增加訊號量成功後會執行程式碼(3),呼叫AQS的方法來啟用因為呼叫acquire方法而被阻塞的執行緒。
6、void release(int permits) 該方法與不帶引數的不同之處在於前者每次呼叫會在訊號量值原來基礎上增加 permits,而後者每次增加 1。原始碼如下:
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
另外注意到這裡呼叫的是 sync.releaseShared 是共享方法,這說明該訊號量是執行緒共享的,訊號量沒有和固定執行緒繫結,多個執行緒可以同時使用CAS去更新訊號量的值而不會阻塞。
到目前已經知道了其原理,接下來用一個例子來加深對Semaphore的理解,例子如下:
package com.hjc;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* Created by cong on 2018/7/8.
*/
public class SemaphoreTest {
// 建立一個Semaphore例項
private static volatile Semaphore emaphore = new Semaphore(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 加入執行緒A到執行緒池
executorService.submit(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread() + " over");
semaphore.release();
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 加入執行緒B到執行緒池
executorService.submit(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread() + " over");
semaphore.release();
}catch (Exception e) {
e.printStackTrace();
}
}
});
// 等待子執行緒執行完畢,返回
semaphore.acquire(2);
System.out.println("all child thread over!");
//關閉執行緒池
executorService.shutdown();
}
}
執行結果如下:
Thread[pool-1-thread-1,5,main] over
Thread[pool-1-thread-2,5,main] over
all child thread over!
類似於 CountDownLatch,上面我們的例子也是在主執行緒中開啟兩個子執行緒進行執行,等所有子執行緒執行完畢後主執行緒在繼續向下執行。
如上程式碼首先首先建立了一個訊號量例項,建構函式的入參為 0,說明當前訊號量計數器為 0,然後 main 函式新增兩個執行緒任務到執行緒池,每個執行緒內部呼叫了訊號量的 release 方法,相當於計數值遞增一,最後在 main 執行緒裡面呼叫訊號量的 acquire 方法,引數傳遞為 2 說明呼叫 acquire 方法的執行緒會一直阻塞,直到訊號量的計數變為 2 時才會返回。
看到這裡也就明白了,如果構造 Semaphore 時候傳遞的引數為 N,在 M 個執行緒中呼叫了該訊號量的 release 方法,那麼在呼叫 acquire 對 M 個執行緒進行同步時候傳遞的引數應該是 M+N;
對CountDownLatch,CyclicBarrier,Semaphored這三者之間的比較總結:
1、CountDownLatch 通過計數器提供了更靈活的控制,只要檢測到計數器為 0,而不管當前執行緒是否結束呼叫 await 的執行緒就可以往下執行,相比使用 jion 必須等待執行緒執行完畢後主執行緒才會繼續向下執行更靈活。
2、CyclicBarrier 也可以達到 CountDownLatch 的效果,但是後者當計數器變為 0 後,就不能在被複用,而前者則使用 reset 方法可以重置後複用,前者對同一個演算法但是輸入引數不同的類似場景下比較適用。
3、而 semaphore 採用了訊號量遞增的策略,一開始並不需要關心需要同步的執行緒個數,等呼叫 aquire 時候在指定需要同步個數,並且提供了獲取訊號量的公平性策略。
歡迎大家加入粉絲群:963944895,群內免費分享Spring框架、Mybatis框架SpringBoot框架、SpringMVC框架、SpringCloud微服務、Dubbo框架、Redis快取、RabbitMq訊息、JVM調優、Tomcat容器、MySQL資料庫教學視訊及架構學習思維導圖