1. 程式人生 > >Java併發包原始碼學習系列:同步元件Semaphore原始碼解析

Java併發包原始碼學習系列:同步元件Semaphore原始碼解析

[toc] ## Semaphore概述及案例學習 Semaphore訊號量用來**控制同時訪問特定資源的執行緒數量**,它通過協調各個執行緒,以保證合理地使用公共資源。 ```java public class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); //10個許可證數量,最大併發數為10 public static void main(String[] args) { for(int i = 0; i < THREAD_COUNT; i ++){ //執行30個執行緒 threadPool.execute(new Runnable() { @Override public void run() { s.tryAcquire(); //嘗試獲取一個許可證 System.out.println("save data"); s.release(); //使用完之後歸還許可證 } }); } threadPool.shutdown(); } } ``` - 建立一個大小為30的執行緒池,但是訊號量規定在10,保證許可證數量為10。 - 每次執行緒呼叫`tryAcquire()`或者`acquire()`方法都會原子性的遞減許可證的數量,release()會原子性遞增許可證數量。 ## 類圖結構及重要欄位 ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210221200730681-1085126296.png) ```java public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; /** All mechanics via AbstractQueuedSynchronizer subclass */ private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { // permits指定初始化訊號量個數 Sync(int permits) { setState(permits); } // ... } static final class NonfairSync extends Sync {...} static final class FairSync extends Sync {...} // 預設採用非公平策略 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 可以指定公平策略 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } //... } ``` - 基於AQS,類似於ReentrantLock,Sync繼承自AQS,有公平策略和非公平策略兩種實現。 - 類似於CountDownLatch,state在這裡也是通過構造器指定,表示初始化訊號量的個數。 > 本篇文章閱讀需要建立在一定的AQS基礎之上,這邊推薦幾篇前置文章,可以瞅一眼: > > - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) > - [Java併發包原始碼學習系列:CLH同步佇列及同步資源獲取與釋放](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112301359) > - [Java併發包原始碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838) >
- [Java併發包原始碼學習系列:詳解Condition條件佇列、signal和await](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112727669) > - [Java併發包原始碼學習系列:掛起與喚醒執行緒LockSupport工具類](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112757098) ## void acquire() 呼叫該方法時,表示希望獲取一個訊號量資源,相當於`acquire(1)`。 如果當前訊號量個數大於0,CAS將當前訊號量值減1,成功後直接返回。 如果當前訊號量個數等於0,則當前執行緒將被置入AQS的阻塞佇列。 該方法是響應中斷的,其他執行緒呼叫了該執行緒的`interrupt()`方法,將會丟擲中斷異常返回。 ```java // Semaphore.java public void acquire() throws InterruptedException { // 傳遞的 arg 為 1 , 獲取1個訊號量資源 sync.acquireSharedInterruptibly(1); } // AQS.java public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 執行緒被 中斷, 丟擲中斷異常 if (Thread.interrupted()) throw new InterruptedException(); // 子類實現, 公平和非公平兩種策略 if (tryAcquireShared(arg) < 0) // 如果獲取失敗, 則置入阻塞佇列, // 再次進行嘗試, 嘗試失敗則掛起當前執行緒 doAcquireSharedInterruptibly(arg); } ``` ### 非公平 ```java static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { // 這裡直接呼叫Sync定義的 非公平共享模式獲取方法 return nonfairTryAcquireShared(acquires); } } abstract static class Sync extends AbstractQueuedSynchronizer { final int nonfairTryAcquireShared(int acquires) { for (;;) { // 獲取當前訊號量的值 int available = getState(); // 減去需要獲取的值, 得到剩餘的訊號量個數 int remaining = available - acquires; // 不剩了,表示當前訊號量個數不能滿足需求, 返回負數, 執行緒置入AQS阻塞 // 還有的剩, CAS設定當前訊號量值為剩餘值, 並返回剩餘值 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } ``` 你會發現,非公平策略是無法保證【AQS佇列中阻塞的執行緒】和【當前執行緒】獲取的順序的,當前執行緒是有可能在排隊的執行緒之前就拿到資源,產生插隊現象。 公平策略就不一樣了,它會通過`hasQueuedPredecessors()`方法看看佇列中是否存在前驅節點,以保證公平性。 ### 公平策略 ```java static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { // 如果佇列中在此之前已經有執行緒在排隊了,直接放棄獲取 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } ``` ## void acquire(int permits) 在acquire()的基礎上,指定了獲取訊號量的數量permits。 ```java public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } ``` ## void acquireUninterruptibly() 該方法與`acquire()`類似,但是不響應中斷。 ```java public void acquireUninterruptibly() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } ``` ## void acquireUninterruptibly(int permits) 該方法與`acquire(permits)`類似,但是不響應中斷。 ```java public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); } ``` ## boolean tryAcquire() tryAcquire和acquire非公平策略公用一個邏輯,但是區別在於,如果獲取訊號量失敗,或者CAS失敗,將會直接返回false,而不會置入阻塞佇列中。 > 一般try開頭的方法的特點就是這樣,嘗試一下,成功是最好,失敗也不至於被阻塞,而是立刻返回false。 ```java public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } abstract static class Sync extends AbstractQueuedSynchronizer { final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } ``` ## boolean tryAcquire(int permits) 相比於普通的`tryAcquire()`,指定了permits的值。 ```java public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } ``` ## boolean tryAcquire(int permits, long timeout, TimeUnit unit) 相比於`tryAcquire(int permits)`,增加了超時控制。 ```java public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); } ``` ## void release() 將訊號量值加1,如果有執行緒因為呼叫acquire方法而被阻塞在AQS阻塞佇列中,將根據公平策略選擇一個訊號量個數滿足需求的執行緒喚醒,執行緒喚醒後也會嘗試獲取新增的訊號量。 參考文章:[Java併發包原始碼學習系列:AQS共享模式獲取與釋放資源](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838) ```java // Semaphore.java public void release() { sync.releaseShared(1); } // AQS.java public final boolean releaseShared(int arg) { // 嘗試釋放鎖 if (tryReleaseShared(arg)) { // 釋放成功, 喚醒AQS佇列裡面最先掛起的執行緒 // https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838 doReleaseShared(); return true; } return false; } // Semaphore#Sync.java abstract static class Sync extends AbstractQueuedSynchronizer { protected final boolean tryReleaseShared(int releases) { for (;;) { // 獲取當前訊號量 int current = getState(); // 期望加上releases int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // CAS操作,更新 if (compareAndSetState(current, next)) return true; } } } ``` ## void release(int permits) 和`release()`相比指定了permits的值。 ```java public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } ``` ## 其他方法 Semaphore還提供其他一些方法,實現比較簡單,這邊就簡單寫一下吧: ```java // 返回此訊號量中當前可用的許可證數量, 其實就是得到當前的 state值 getState() public int availablePermits() { return sync.getPermits(); } // 將state更新為0, 返回0 public int drainPermits() { return sync.drainPermits(); } // 減少reduction個許可證 protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); } // 判斷公平策略 public boolean isFair() { return sync instanceof FairSync; } // 判斷是否有執行緒證在等待獲取許可證 public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } // 返回正在等待獲取許可證的執行緒數 public final int getQueueLength() { return sync.getQueueLength(); } // 返回所有等待獲取許可證的執行緒集合 protected Collection getQueuedThreads() { return sync.getQueuedThreads(); } ``` ## 總結 Semaphore訊號量用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理地使用公共資源。 - 基於AQS,類似於ReentrantLock,Sync繼承自AQS,有公平策略和非公平策略兩種實現。 - 類似於CountDownLatch,state在這裡也是通過構造器指定,表示初始化訊號量的個數。 每次執行緒呼叫`tryAcquire()`或者`acquire()`方法都會原子性的遞減許可證的數量,release()會原子性遞增許可證數量,只要有許可證就可以重複使用。 ## 參考閱讀 - 《Java併發程式設計之美》 - 《Java併發程式設計的