Java併發-深入理解Semaphore(訊號量)之原始碼解析
阿新 • • 發佈:2018-12-22
深入理解Semaphore(訊號量)
Semaphore藉助AQS
- Sync 繼承 AbstractQueuedSynchronizer(AQS同步器)
- NonfairSync Sync的非公平實現
- FairSync Sync的公平實現
為什麼沒有實現Lock介面?
- 因為,lock和unlock沒有引數,無法達到此效果
套路(同ReentrantLock)
- AQS操作,是整合AQS類作為靜態內部類
- Sync預設是非公平的實現
- NonfairSync 非公平同步器
- FairSync 公平同步器
Semaphore的特性
- Semaphore是一個共享同步佇列
- Semaphore主要是控住同一時間執行的執行緒數
- state標識當前執行的執行緒數
- 是可以響應中斷的
原始碼分析
構造
Semaphore構造
//預設識非公平同步佇列
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore.Sync構造
//permits表示,限制限號量,此時state==permits
Sync(int permits) {
setState(permits);
}
aquire()方法
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); //呼叫AQS的acquireSharedInterruptibly sync.acquireSharedInterruptibly(permits); } //AQS public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 這裡呼叫Semaphore的tryAcquireShared方法,如果<0,標識不能獲取該資源鎖,就會入隊 if (tryAcquireShared(arg) < 0) //可以被中斷的入隊操作 doAcquireSharedInterruptibly(arg); } //非公平的tryAcquireShared方法呼叫父類的nonfairTryAcquireShared方法 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; //這裡remaining標識剩餘的訊號量 //remaining小於0會直接返回,在AQS只有>0的返回值才能獲取資源鎖 if (remaining < 0 || //CAS操作,對state進行賦值,成功設定state,此時訊號量>0,即可獲取資源鎖 compareAndSetState(available, remaining)) return remaining; } } //公平同步器的tryAcquireShared方法 protected int tryAcquireShared(int acquires) { for (;;) { //這裡判斷是否存在前繼節點,存在就不可獲取資源鎖,能獲取資源鎖的之能事head節點 if (hasQueuedPredecessors()) return -1; //同上操作 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
release()方法
//獲取的時候,傳入的訊號值必須和釋放的訊號值相匹配
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
public final boolean releaseShared(int arg) {
//首先呼叫tryReleaseShared,該操作是一個資源操作,只可能返回true
if (tryReleaseShared(arg)) {
//進行釋放,其實就是講該節點從同步佇列中移除,更新新的head指向的node引用
doReleaseShared();
return true;
}
return false;
}
//Syn統一實現了該方法,釋放可以公用的
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS操作,更新state的值
if (compareAndSetState(current, next))
return true;
}
}
Demo
總會是maxCount個執行緒一起執行,這裡訊號量規定了5個,而每次執行緒需要訊號為1,那麼就表示,該訊號量支援5
個執行緒同時執行任務。也支援每個執行緒需要的訊號不同,資源一定的情況下,每個執行緒消耗不一樣,維持系統穩定。
/**
* describe:
* E-mail:[email protected] date:2018/12/17
*
* @Since 0.0.1
*/
public class SemaphoreTest {
//訊號量的permits
private final int maxCount;
private final Semaphore semaphore;
public SemaphoreTest(int maxCount) {
this.maxCount = maxCount;
//預設是nofair
this.semaphore = new Semaphore(maxCount);
}
public void add() {
try {
//獲取到訊號量
semaphore.acquire();
//操作
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
class A extends Thread {
SemaphoreTest semaphoreTest;
public A(SemaphoreTest semaphoreTest) {
this.semaphoreTest = semaphoreTest;
}
@Override
public void run() {
semaphoreTest.add();
System.out.println("add:");
}
}
class Test {
public static void main(String[] args) {
SemaphoreTest semaphoreTest = new SemaphoreTest(5);
for (int i = 0; i < 10; i++) {
A a = new A(semaphoreTest);
a.start();
}
}
}