【Java 併發筆記】Semaphore 相關整理
文前說明
作為碼農中的一員,需要不斷的學習,我工作之餘將一些分析總結和學習筆記寫成部落格與大家一起交流,也希望採用這種方式記錄自己的學習之旅。
本文僅供學習交流使用,侵權必刪。
不用於商業目的,轉載請註明出處。
1. 簡介
- Semaphore 類是一個計數訊號量,必須由獲取它的執行緒釋放, 通常用於限制可以訪問某些資源(物理或邏輯的)執行緒數目。
- 一個訊號量有且僅有 3 種操作,且它們全部是原子的。
- 初始化、增加和減少。
- 增加可以為一個程序解除阻塞。
- 減少可以讓一個程序進入阻塞。
- Semaphore 管理一系列許可證。
- 每個
acquire()
方法阻塞,直到有一個許可證可以獲得然後拿走一個許可證。 - 每個
release()
方法增加一個許可證,這可能會釋放一個阻塞的acquire()
方法。 - 不使用實際的許可物件,Semaphore 只對可用許可的號碼進行計數,並採取相應的行動。
- 每個
- Semaphore 在計數器不為 0 的時候對執行緒就放行,一旦達到 0,那麼所有請求資源的新執行緒都會被阻塞,包括增加請求到許可的執行緒,Semaphore 是不可重入的。
- 每一次請求一個許可都會導致計數器減少 1,同樣每次釋放一個許可都會導致計數器增加 1,一旦達到 0,新的許可請求執行緒將被掛起。
- Semaphore 有兩種模式, 公平模式 和 非公平模式 。
- 公平模式就是呼叫 acquire 的順序就是獲取許可證的順序,遵循 FIFO。
- 非公平模式是搶佔式的,也就是有可能一個新的獲取執行緒恰好在一個許可證釋放時得到了這個許可證,而前面還有等待的執行緒。
1.1 Semaphore 的應用場景
- 比如模擬一個停車場停車訊號,假設停車場只有兩個車位,一開始兩個車位都是空的。這時同時來了兩輛車,看門人允許它們進入停車場,然後放下車攔。以後來的車必須在入口等待,直到停車場中有車輛離開。這時,如果有一輛車離開停車場,看門人得知後,開啟車攔,放入一輛,如果又離開一輛,則又可以放入一輛,如此往復。
public class SemaphoreDemo { private static Semaphore s = new Semaphore(2); static class ParkTask implements Runnable { private String name; public ParkTask(String name) { this.name = name; } @Override public void run() { try { s.acquire(); System.out.println("Thread " + this.name + " start..."); TimeUnit.SECONDS.sleep(new Random().nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } finally { s.release(); } } public static void main(String[] args) { ExecutorService pool = Executors.newCachedThreadPool(); pool.submit(new ParkTask("1")); pool.submit(new ParkTask("2")); pool.submit(new ParkTask("3")); pool.submit(new ParkTask("4")); pool.submit(new ParkTask("5")); pool.submit(new ParkTask("6")); pool.shutdown(); } } /** --- print --- Thread 2 start... Thread 6 start... Thread 3 start... Thread 1 start... Thread 4 start... Thread 5 start... */
2. Semaphore 原理

Semaphore 類圖
- Semaphore 通過使用內部類 Syn 繼承AQS 實現。
- 包含的方法函式。
// 建立具有給定的許可數和非公平的公平設定的 Semaphore。 Semaphore(int permits) // 建立具有給定的許可數和給定的公平設定的 Semaphore。 Semaphore(int permits, boolean fair) // 從此訊號量獲取一個許可,在提供一個許可前一直將執行緒阻塞,否則執行緒被中斷。 void acquire() // 從此訊號量獲取給定數目的許可,在提供這些許可前一直將執行緒阻塞,或者執行緒已被中斷。 void acquire(int permits) // 從此訊號量中獲取許可,在有可用的許可前將其阻塞。 void acquireUninterruptibly() // 從此訊號量獲取給定數目的許可,在提供這些許可前一直將執行緒阻塞。 void acquireUninterruptibly(int permits) // 返回此訊號量中當前可用的許可數。 int availablePermits() // 獲取並返回立即可用的所有許可。 int drainPermits() // 返回一個 collection,包含可能等待獲取的執行緒。 protected Collection<Thread> getQueuedThreads() // 返回正在等待獲取的執行緒的估計數目。 int getQueueLength() // 查詢是否有執行緒正在等待獲取。 boolean hasQueuedThreads() // 如果此訊號量的公平設定為 true,則返回 true。 boolean isFair() // 根據指定的縮減量減小可用許可的數目。 protected void reducePermits(int reduction) // 釋放一個許可,將其返回給訊號量。 void release() // 釋放給定數目的許可,將其返回到訊號量。 void release(int permits) // 返回標識此訊號量的字串,以及訊號量的狀態。 String toString() // 僅在呼叫時此訊號量存在一個可用許可,才從訊號量獲取許可。 boolean tryAcquire() // 僅在呼叫時此訊號量中有給定數目的許可時,才從此訊號量中獲取這些許可。 boolean tryAcquire(int permits) // 如果在給定的等待時間內此訊號量有可用的所有許可,並且當前執行緒未被中斷,則從此訊號量獲取給定數目的許可。 boolean tryAcquire(int permits, long timeout, TimeUnit unit) // 如果在給定的等待時間內,此訊號量有可用的許可並且當前執行緒未被中斷,則從此訊號量獲取一個許可。 boolean tryAcquire(long timeout, TimeUnit unit)
建構函式
- 兩個構造方法,都必須提供許可的數量,第二個構造方法可以指定是公平模式還是非公平模式,預設非公平模式。
//permits是允許同時執行的執行緒數目 public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
- Semaphore 內部基於 AQS 的共享模式,所有實現委託給 Sync 類完成。
NonfairSync(int permits) { super(permits); } ...... Sync(int permits) { setState(permits); }
- Semaphore 提供了兩種獲取資源的方式。
- 響應中斷 和 不響應中斷 。
響應中斷獲取資源
- 兩個方法支援 Interrupt 中斷機制,可使用
acquire()
方法每次獲取一個訊號量,也可以使用acquire(int permits)
方法獲取指定數量的訊號量 。
//從semaphore中獲取一個許可,執行緒會一直被阻塞直到獲取一個許可或是被中斷,獲取一個許可後立即返回,並把許可數減1,如果沒有可用的許可,當前執行緒會處於休眠狀態直到 //1.某些其他執行緒呼叫release方法,並且當前執行緒是下一個要被分配許可的執行緒 //2.某些其他執行緒中斷當前執行緒 //如果當前執行緒被acquire方法使得中斷狀態設定為on或者在等待許可時被中斷則丟擲InterruptedException,並且清除當前執行緒的已中斷狀態 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
- AQS 子類使用共享模式,需要實現
tryAcquireShared()
方法。- 在公平鎖中還是與ReentrantLock 中的操作一樣,先判斷同步佇列中是不是還有其他的等待執行緒,有則直接返回失敗。
- 否則對 state 值進行減操作並返回剩下的訊號量。
- 非公平鎖直接呼叫了父類中的 nonfairTryAcquireShared 和 ReentrantLock 一樣。
- 在公平鎖中還是與ReentrantLock 中的操作一樣,先判斷同步佇列中是不是還有其他的等待執行緒,有則直接返回失敗。
// 非公平鎖的獲取方式 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState();//獲取去中的訊號量數 int remaining = available - acquires;//剩餘訊號量數 //1.訊號量數大於0,獲取共享鎖,並設定執行compareAndSetState(available, remaining),返回剩餘訊號量數 //2.訊號量數小於等於0,直接返回負數 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 公平鎖獲取 protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
- 變數 state 採用volatile 可見修飾。
/** * The synchronization state. */ private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a <tt>volatile</tt> read. * @return current state value */ protected final int getState() { return state; }
不響應中斷獲取資源
- 兩個方法不響應 Interrupt 中斷機制,其它功能與
acquire()
方法一致。
//從semaphore中獲取一個許可,執行緒會一直被阻塞直到獲取一個許可或是被中斷,獲取一個許可後立即返回,並把許可數減1,如果沒有可用的許可,當前執行緒會處於休眠狀態直到 //1.某些其他執行緒呼叫release方法,並且當前執行緒是下一個要被分配許可的執行緒 //2.如果當前執行緒在等待許可時被中斷,那麼它會接著等待,但是與沒有發生中斷相比,為執行緒分配許可的時間可能改變 ublic void acquireUninterruptibly() { sync.acquireShared(1); } public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }!
嘗試獲得訊號量
- 嘗試獲得訊號量有三個方法。
- 嘗試獲取訊號量,如果獲取成功則返回 true,否則馬上返回 false,不會阻塞當前執行緒。
- 嘗試獲取訊號量,如果在指定的時間內獲得訊號量,則返回 true,否則返回 false。
- 嘗試獲取指定數量的訊號量,如果在指定的時間內獲得訊號量,則返回 true,否則返回 false。
public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }
釋放資源
- release 方法,主要作用是釋放資源,需要保證 release 的執行,否則執行緒退出但是資源沒有釋放。
- 一般程式碼寫在 finally 中是最好的。
- 並且獲取多少資源就要釋放多少資源,否則還是資源沒被正確釋放,如果一開始執行了
acquire(10)
最後釋放的時候不能只寫一個release()
而是release(10)
才對。
//嘗試釋放鎖 public final boolean release(int arg) { // 如果釋放鎖成功 喚醒同步佇列中的後繼節點 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 為了方便對比把兩個程式碼放在一塊 可以看到 release 中的結構完全一樣 // 區別就在於 doReleaseShared 中有更多的判斷操作 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared();//在裡面執行的 unparkSuccessor(h) return true; } return false; }
- 子類實現共享模式的類需要實現
tryReleaseShared()
方法判斷是否釋放成功。- 這個方法是一個 CAS 自旋,原因是應為 Semaphore 是一個共享鎖,可能有多個執行緒同時釋放資源,因此 CAS 操作可能失敗。
protected final boolean tryReleaseShared(int releases) { for (;;) { //獲取當前許可數量 int current = getState(); //計算回收後的數量 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //CAS改變許可數量成功,返回true if (compareAndSetState(current, next)) return true; } }
- 一旦 CAS 改變許可數量成功,就呼叫
doReleaseShared()
方法釋放阻塞的執行緒。
其他方法
獲取當前剩餘的訊號量數量
- 該方法返回 AQS 中 state 變數的值,當前剩餘的訊號量個數。
public int availablePermits() { return sync.getPermits(); } // Sync final int getPermits() { return getState(); }
耗盡許可數量
- 獲取並返回立即可用的所有許可。
- Sync 類的
drainPermits()
方法,獲取 1 個訊號量後將可用的訊號量個數置為 0。drainPermits()
public int drainPermits() { return sync.drainPermits(); } // Sync final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } }
縮減許可數量
- 縮減必須是單向的,即只能減少不能增加。
- 用 CAS 自旋在剩餘共享資源上做縮減。
protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); } // Sync final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } }
- 上述兩個方法對共享資源數量的修改操作有兩點需要注意
- 是不可逆的
- 是對剩餘資源的操作而不是全部資源,當剩餘資源數目不足或已經為 0 時,方法就返回。
- 正在被佔用的資源不參與。
判斷 AQS 同步佇列中是否還有 Node
public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } // AbstractQueuedSynchronizer public final boolean hasQueuedThreads() { //頭結點不等於尾節點就說明連結串列中還有元素 return head != tail; }
3. 總結
- Semaphore 的內部工作流程也是基於 AQS,不同於CyclicBarrier 和 ReentrantLock,不會使用到 AQS 的條件佇列,都是在同步佇列中操作,只是當前執行緒會被 park。
- Semaphore 是 JUC 包提供的一個典型的共享鎖,它通過自定義兩種不同的同步器(FairSync 和 NonfairSync)提供了公平和非公平兩種工作模式,兩種模式下分別提供了限時/不限時、響應中斷/不響應中斷的獲取資源的方法(限時獲取總是及時響應中斷的),而所有的釋放資源的
release()
操作是統一的。