1. 程式人生 > >Java - "JUC" Semaphore源碼分析

Java - "JUC" Semaphore源碼分析

fin code perm exc static else if tool 過程 每一個

Java多線程系列--“JUC鎖”11之 Semaphore信號量的原理和示例

Semaphore簡介

Semaphore是一個計數信號量,它的本質是一個"共享鎖"。

信號量維護了一個信號量許可集。線程可以通過調用acquire()來獲取信號量的許可;當信號量中有可用的許可時,線程能獲取該許可;否則線程必須等待,直到有可用的許可為止。 線程可以通過release()來釋放它所持有的信號量許可。

Java並發提供了兩種加鎖模式:共享鎖和獨占鎖。前面LZ介紹的ReentrantLock就是獨占鎖。對於獨占鎖而言,它每次只能有一個線程持有,而共享鎖則不同,它允許多個線程並行持有鎖,並發訪問共享資源。

獨占鎖它所采用的是一種悲觀的加鎖策略, 對於寫而言為了避免沖突獨占是必須的,但是對於讀就沒有必要了,因為它不會影響數據的一致性。如果某個只讀線程獲取獨占鎖,則其他讀線程都只能等待了,這種情況下就限制了不必要的並發性,降低了吞吐量。而共享鎖則不同,它放寬了加鎖的條件,采用了樂觀鎖機制,它是允許多個讀線程同時訪問同一個共享資源的。

Semaphore,在API中是這樣介紹的,一個計數信號量。從概念上講,信號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,並采取相應的行動。

Semaphore 通常用於限制可以訪問某些資源(物理或邏輯的)的線程數目。下面LZ以理發為例來簡述Semaphore。

為了簡單起見,我們假設只有三個理發師、一個接待人。一開始來了五個客人,接待人則安排三個客人進行理發,其余兩個人必須在那裏等著,此後每個來理發店的人都必須等待。一段時間後,一個理發師完成理發後,接待人則安排另一個人(公平還是非公平機制呢??)來理發。在這裏理發師則相當於公共資源,接待人則相當於信號量(Semaphore),客戶相當於線程。

進一步講,我們確定信號量Semaphore是一個非負整數(>=1)。當一個線程想要訪問某個共享資源時,它必須要先獲取Semaphore,當Semaphore >0時,獲取該資源並使Semaphore – 1。如果Semaphore值 = 0,則表示全部的共享資源已經被其他線程全部占用,線程必須要等待其他線程釋放資源。當線程釋放資源時,Semaphore則+1;

當信號量Semaphore = 1 時,它可以當作互斥鎖使用。其中0、1就相當於它的狀態,當=1時表示其他線程可以獲取,當=0時,排他,即其他線程必須要等待。


Semaphore的函數列表

技術分享
// 創建具有給定的許可數和非公平的公平設置的 Semaphore。
Semaphore(int permits)
// 創建具有給定的許可數和給定的公平設置的 Semaphore。
Semaphore(int permits, boolean fair)

// 從此信號量獲取一個許可,在提供一個許可前一直將線程阻塞,否則線程被中斷。
void acquire()
// 從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者線程已被中斷。
void acquire(int permits)
// 從此信號量中獲取許可,在有可用的許可前將其阻塞。
void acquireUninterruptibly()
// 從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信號量中當前可用的許可數。
int availablePermits()
// 獲取並返回立即可用的所有許可。
int drainPermits()
// 返回一個 collection,包含可能等待獲取的線程。
protected Collection<Thread> getQueuedThreads()
// 返回正在等待獲取的線程的估計數目。
int getQueueLength()
// 查詢是否有線程正在等待獲取。
boolean hasQueuedThreads()
// 如果此信號量的公平設置為 true,則返回 true。
boolean isFair()
// 根據指定的縮減量減小可用許可的數目。
protected void reducePermits(int reduction)
// 釋放一個許可,將其返回給信號量。
void release()
// 釋放給定數目的許可,將其返回到信號量。
void release(int permits)
// 返回標識此信號量的字符串,以及信號量的狀態。
String toString()
// 僅在調用時此信號量存在一個可用許可,才從信號量獲取許可。
boolean tryAcquire()
// 僅在調用時此信號量中有給定數目的許可時,才從此信號量中獲取這些許可。
boolean tryAcquire(int permits)
// 如果在給定的等待時間內此信號量有可用的所有許可,並且當前線程未被中斷,則從此信號量獲取給定數目的許可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 如果在給定的等待時間內,此信號量有可用的許可並且當前線程未被中斷,則從此信號量獲取一個許可。
boolean tryAcquire(long timeout, TimeUnit unit)
技術分享

Semaphore數據結構

Semaphore的UML類圖如下:

技術分享

從圖中可以看出:
(01) 和"ReentrantLock"一樣,Semaphore也包含了sync對象,sync是Sync類型;而且,Sync是一個繼承於AQS的抽象類。
(02) Sync包括兩個子類:"公平信號量"FairSync 和 "非公平信號量"NonfairSync。sync是"FairSync的實例",或者"NonfairSync的實例";默認情況下,sync是NonfairSync(即,默認是非公平信號量)。

Semaphore示例

技術分享
 1 import java.util.concurrent.ExecutorService; 
 2 import java.util.concurrent.Executors; 
 3 import java.util.concurrent.Semaphore; 
 4 
 5 public class SemaphoreTest1 { 
 6     private static final int SEM_MAX = 10;
 7     public static void main(String[] args) { 
 8         Semaphore sem = new Semaphore(SEM_MAX);
 9         //創建線程池
10         ExecutorService threadPool = Executors.newFixedThreadPool(3);
11         //在線程池中執行任務
12         threadPool.execute(new MyThread(sem, 5));
13         threadPool.execute(new MyThread(sem, 4));
14         threadPool.execute(new MyThread(sem, 7));
15         //關閉池
16         threadPool.shutdown();
17     }
18 }
19 
20 class MyThread extends Thread {
21     private volatile Semaphore sem;    // 信號量
22     private int count;        // 申請信號量的大小 
23 
24     MyThread(Semaphore sem, int count) {
25         this.sem = sem;
26         this.count = count;
27     }
28 
29     public void run() {
30         try {
31             // 從信號量中獲取count個許可
32             sem.acquire(count);
33 
34             Thread.sleep(2000);
35             System.out.println(Thread.currentThread().getName() + " acquire count="+count);
36         } catch (InterruptedException e) {
37             e.printStackTrace();
38         } finally {
39             // 釋放給定數目的許可,將其返回到信號量。
40             sem.release(count);
41             System.out.println(Thread.currentThread().getName() + " release " + count + "");
42         }
43     }
44 }
技術分享

(某一次)運行結果:

pool-1-thread-1 acquire count=5
pool-1-thread-2 acquire count=4
pool-1-thread-1 release 5
pool-1-thread-2 release 4
pool-1-thread-3 acquire count=7
pool-1-thread-3 release 7

結果說明:信號量sem的許可總數是10個;共3個線程,分別需要獲取的信號量許可數是5,4,7。前面兩個線程獲取到信號量的許可後,sem中剩余的可用的許可數是1;因此,最後一個線程必須等前兩個線程釋放了它們所持有的信號量許可之後,才能獲取到7個信號量許可。

Semaphore源碼分析(基於JDK1.7.0_40)

Semaphore完整源碼(基於JDK1.7.0_40)

技術分享 View Code

Semaphore是通過共享鎖實現的。根據共享鎖的獲取原則,Semaphore分為"公平信號量"和"非公平信號量"。


"公平信號量"和"非公平信號量"的區別

"公平信號量"和"非公平信號量"的釋放信號量的機制是一樣的!不同的是它們獲取信號量的機制:線程在嘗試獲取信號量許可時,對於公平信號量而言,如果當前線程不在CLH隊列的頭部,則排隊等候;而對於非公平信號量而言,無論當前線程是不是在CLH隊列的頭部,它都會直接獲取信號量。該差異具體的體現在,它們的tryAcquireShared()函數的實現不同。

"公平信號量"類

技術分享 View Code

"非公平信號量"類

技術分享 View Code

下面,我們逐步的對它們的源碼進行分析。


1. 信號量構造函數

技術分享
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
技術分享

從中,我們可以信號量分為“公平信號量(FairSync)”和“非公平信號量(NonfairSync)”。Semaphore(int permits)函數會默認創建“非公平信號量”。


2. 公平信號量獲取和釋放

2.1 公平信號量的獲取
Semaphore中的公平信號量是FairSync。它的獲取API如下:

技術分享
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
技術分享

信號量中的acquire()獲取函數,實際上是調用的AQS中的acquireSharedInterruptibly()。

acquireSharedInterruptibly()的源碼如下:

技術分享
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 如果線程是中斷狀態,則拋出異常。
    if (Thread.interrupted())
        throw new InterruptedException();
    // 否則,嘗試獲取“共享鎖”;獲取成功則直接返回,獲取失敗,則通過doAcquireSharedInterruptibly()獲取。
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
技術分享

Semaphore中”公平鎖“對應的tryAcquireShared()實現如下:

技術分享
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 判斷“當前線程”是不是CLH隊列中的第一個線程線程,
        // 若是的話,則返回-1。
        if (hasQueuedPredecessors())
            return -1;
        // 設置“可以獲得的信號量的許可數”
        int available = getState();
        // 設置“獲得acquires個信號量許可之後,剩余的信號量許可數”
        int remaining = available - acquires;
        // 如果“剩余的信號量許可數>=0”,則設置“可以獲得的信號量許可數”為remaining。
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
技術分享

說明:tryAcquireShared()的作用是嘗試獲取acquires個信號量許可數。
對於Semaphore而言,state表示的是“當前可獲得的信號量許可數”。

下面看看AQS中doAcquireSharedInterruptibly()的實現:

技術分享
private void doAcquireSharedInterruptibly(long arg)
    throws InterruptedException {
    // 創建”當前線程“的Node節點,且Node中記錄的鎖是”共享鎖“類型;並將該節點添加到CLH隊列末尾。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 獲取上一個節點。
            // 如果上一節點是CLH隊列的表頭,則”嘗試獲取共享鎖“。
            final Node p = node.predecessor();
            if (p == head) {
                long r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 當前線程一直等待,直到獲取到共享鎖。
            // 如果線程在等待過程中被中斷過,則再次中斷該線程(還原之前的中斷狀態)。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
技術分享

說明:doAcquireSharedInterruptibly()會使當前線程一直等待,直到當前線程獲取到共享鎖(或被中斷)才返回。
(01) addWaiter(Node.SHARED)的作用是,創建”當前線程“的Node節點,且Node中記錄的鎖的類型是”共享鎖“(Node.SHARED);並將該節點添加到CLH隊列末尾。關於Node和CLH在"Java多線程系列--“JUC鎖”03之 公平鎖(一)"已經詳細介紹過,這裏就不再重復說明了。
(02) node.predecessor()的作用是,獲取上一個節點。如果上一節點是CLH隊列的表頭,則”嘗試獲取共享鎖“。
(03) shouldParkAfterFailedAcquire()的作用和它的名稱一樣,如果在嘗試獲取鎖失敗之後,線程應該等待,則返回true;否則,返回false。
(04) 當shouldParkAfterFailedAcquire()返回ture時,則調用parkAndCheckInterrupt(),當前線程會進入等待狀態,直到獲取到共享鎖才繼續運行。
doAcquireSharedInterruptibly()中的shouldParkAfterFailedAcquire(), parkAndCheckInterrupt等函數在"Java多線程系列--“JUC鎖”03之 公平鎖(一)"中介紹過,這裏也就不再詳細說明了。


2.2 公平信號量的釋放

Semaphore中公平信號量(FairSync)的釋放API如下:

技術分享
public void release() {
    sync.releaseShared(1);
}

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
技術分享

信號量的releases()釋放函數,實際上是調用的AQS中的releaseShared()。

releaseShared()在AQS中實現,源碼如下:

技術分享
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
技術分享

說明:releaseShared()的目的是讓當前線程釋放它所持有的共享鎖。
它首先會通過tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過doReleaseShared()去釋放共享鎖。

Semaphore重寫了tryReleaseShared(),它的源碼如下:

技術分享
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 獲取“可以獲得的信號量的許可數”
        int current = getState();
        // 獲取“釋放releases個信號量許可之後,剩余的信號量許可數”
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 設置“可以獲得的信號量的許可數”為next。
        if (compareAndSetState(current, next))
            return true;
    }
}
技術分享

如果tryReleaseShared()嘗試釋放共享鎖失敗,則會調用doReleaseShared()去釋放共享鎖。doReleaseShared()的源碼如下:

技術分享
private void doReleaseShared() {
    for (;;) {
        // 獲取CLH隊列的頭節點
        Node h = head;
        // 如果頭節點不為null,並且頭節點不等於tail節點。
        if (h != null && h != tail) {
            // 獲取頭節點對應的線程的狀態
            int ws = h.waitStatus;
            // 如果頭節點對應的線程是SIGNAL狀態,則意味著“頭節點的下一個節點所對應的線程”需要被unpark喚醒。
            if (ws == Node.SIGNAL) {
                // 設置“頭節點對應的線程狀態”為空狀態。失敗的話,則繼續循環。
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 喚醒“頭節點的下一個節點所對應的線程”。
                unparkSuccessor(h);
            }
            // 如果頭節點對應的線程是空狀態,則設置“文件點對應的線程所擁有的共享鎖”為其它線程獲取鎖的空狀態。
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果頭節點發生變化,則繼續循環。否則,退出循環。
        if (h == head)                   // loop if head changed
            break;
    }
}
技術分享

說明:doReleaseShared()會釋放“共享鎖”。它會從前往後的遍歷CLH隊列,依次“喚醒”然後“執行”隊列中每個節點對應的線程;最終的目的是讓這些線程釋放它們所持有的信號量。

3 非公平信號量獲取和釋放

Semaphore中的非公平信號量是NonFairSync。在Semaphore中,“非公平信號量許可的釋放(release)”與“公平信號量許可的釋放(release)”是一樣的。
不同的是它們獲取“信號量許可”的機制不同,下面是非公平信號量獲取信號量許可的代碼。

非公平信號量的tryAcquireShared()實現如下:

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

nonfairTryAcquireShared()的實現如下:

技術分享
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 設置“可以獲得的信號量的許可數”
        int available = getState();
        // 設置“獲得acquires個信號量許可之後,剩余的信號量許可數”
        int remaining = available - acquires;
        // 如果“剩余的信號量許可數>=0”,則設置“可以獲得的信號量許可數”為remaining。
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
技術分享

說明:非公平信號量的tryAcquireShared()調用AQS中的nonfairTryAcquireShared()。而在nonfairTryAcquireShared()的for循環中,它都會直接判斷“當前剩余的信號量許可數”是否足夠;足夠的話,則直接“設置可以獲得的信號量許可數”,進而再獲取信號量。
而公平信號量的tryAcquireShared()中,在獲取信號量之前會通過if (hasQueuedPredecessors())來判斷“當前線程是不是在CLH隊列的頭部”,是的話,則返回-1。

Java - "JUC" Semaphore源碼分析