Java併發包中Semaphore的工作原理、原始碼分析及使用示例
簡介:
在多執行緒程式設計中有三個同步工具需要我們掌握,分別是Semaphore(訊號量),countDownLatch(倒計數門閘鎖),CyclicBarrier(可重用柵欄)
歡迎探討,如有錯誤敬請指正
如需轉載,請註明出處 http://www.cnblogs.com/nullzx/
1. 訊號量Semaphore的介紹
我們以一個停車場運作為例來說明訊號量的作用。假設停車場只有三個車位,一開始三個車位都是空的。這時如果同時來了三輛車,看門人允許其中它們進入進入,然後放下車攔。以後來的車必須在入口等待,直到停車場中有車輛離開。這時,如果有一輛車離開停車場,看門人得知後,開啟車攔,放入一輛,如果又離開一輛,則又可以放入一輛,如此往復。
在這個停車場系統中,車位是公共資源,每輛車好比一個執行緒,看門人起的就是訊號量的作用。訊號量是一個非負整數,表示了當前公共資源的可用數目(在上面的例子中可以用空閒的停車位類比訊號量),當一個執行緒要使用公共資源時(在上面的例子中可以用車輛類比執行緒),首先要檢視訊號量,如果訊號量的值大於1,則將其減1,然後去佔有公共資源。如果訊號量的值為0,則執行緒會將自己阻塞,直到有其它執行緒釋放公共資源。
在訊號量上我們定義兩種操作: acquire(獲取) 和 release(釋放)。當一個執行緒呼叫acquire操作時,它要麼通過成功獲取訊號量(訊號量減1),要麼一直等下去,直到有執行緒釋放訊號量,或超時。release(釋放)實際上會將訊號量的值加1,然後喚醒等待的執行緒。
訊號量主要用於兩個目的,一個是用於多個共享資源的互斥使用,另一個用於併發執行緒數的控制。
2. 訊號量Semaphore的原始碼分析
在Java的併發包中,Semaphore類表示訊號量。Semaphore內部主要通過AQS(AbstractQueuedSynchronizer)實現執行緒的管理。Semaphore有兩個建構函式,引數permits表示許可數,它最後傳遞給了AQS的state值。執行緒在執行時首先獲取許可,如果成功,許可數就減1,執行緒執行,當執行緒執行結束就釋放許可,許可數就加1。如果許可數為0,則獲取失敗,執行緒位於AQS的等待佇列中,它會被其它釋放許可的執行緒喚醒。在建立Semaphore物件的時候還可以指定它的公平性。一般常用非公平的訊號量,非公平訊號量是指在獲取許可時先嚐試獲取許可,而不必關心是否已有需要獲取許可的執行緒位於等待佇列中,如果獲取失敗,才會入列。而公平的訊號量在獲取許可時首先要檢視等待佇列中是否已有執行緒,如果有則入列。
建構函式原始碼
//非公平的建構函式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//通過fair引數決定公平性
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire原始碼
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
可以看出,如果remaining <0 即獲取許可後,許可數小於0,則獲取失敗,在doAcquireSharedInterruptibly方法中執行緒會將自身阻塞,然後入列。
release原始碼
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
可以看出釋放許可就是將AQS中state的值加1。然後通過doReleaseShared喚醒等待佇列的第一個節點。可以看出Semaphore使用的是AQS的共享模式,等待佇列中的第一個節點,如果第一個節點成功獲取許可,又會喚醒下一個節點,以此類推。
3. 使用示例
package javalearning;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
private Semaphore smp = new Semaphore(3);
private Random rnd = new Random();
class TaskDemo implements Runnable{
private String id;
TaskDemo(String id){
this.id = id;
}
@Override
public void run(){
try {
smp.acquire();
System.out.println("Thread " + id + " is working");
Thread.sleep(rnd.nextInt(1000));
smp.release();
System.out.println("Thread " + id + " is over");
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args){
SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
//注意我建立的執行緒池型別,
ExecutorService se = Executors.newCachedThreadPool();
se.submit(semaphoreDemo.new TaskDemo("a"));
se.submit(semaphoreDemo.new TaskDemo("b"));
se.submit(semaphoreDemo.new TaskDemo("c"));
se.submit(semaphoreDemo.new TaskDemo("d"));
se.submit(semaphoreDemo.new TaskDemo("e"));
se.submit(semaphoreDemo.new TaskDemo("f"));
se.shutdown();
}
}
執行結果
Thread c is working
Thread b is working
Thread a is working
Thread c is over
Thread d is working
Thread b is over
Thread e is working
Thread a is over
Thread f is working
Thread d is over
Thread e is over
Thread f is over
可以看出,最多同時有三個執行緒併發執行,也可以認為有三個公共資源(比如計算機的三個串列埠)。
4. 參考內容
[1] http://my.oschina.net/cloudcoder/blog/362974