1. 程式人生 > >Semaphore實現原理分析

Semaphore實現原理分析

業務需求 err java並發 裏的 eas static 默認 rem lac

synchronized的語義是互斥鎖,就是在同一時刻,只有一個線程能獲得執行代碼的鎖。但是現實生活中,有好多的場景,鎖不止一把。

比如說,又到了十一假期,買票是重點,必須圈起來。在購票大廳裏,有5個售票窗口,也就是說同一時刻可以服務5個人。要實現這種業務需求,用synchronized顯然不合適。

查看Java並發工具,發現有一個Semaphore類,天生就是處理這種情況的。

先用Semaphore實現一個購票的小例子,來看看如何使用

package semaphore;

import java.util.concurrent.Semaphore;

public class Ticket {

    
public static void main(String[] args) { Semaphore windows = new Semaphore(5); // 聲明5個窗口 for (int i = 0; i < 8; i++) { new Thread() { @Override public void run() { try { windows.acquire();
// 占用窗口 System.out.println(Thread.currentThread().getName() + ": 開始買票"); sleep(2000); // 睡2秒,模擬買票流程 System.out.println(Thread.currentThread().getName() + ": 購票成功"); windows.release(); // 釋放窗口 } catch
(InterruptedException e) { e.printStackTrace(); } } }.start(); } } }

運行結果

技術分享
Thread-1: 開始買票
Thread-3: 開始買票
Thread-4: 開始買票
Thread-2: 開始買票
Thread-0: 開始買票
Thread-1: 購票成功
Thread-5: 開始買票
Thread-3: 購票成功
Thread-2: 購票成功
Thread-4: 購票成功
Thread-7: 開始買票
Thread-6: 開始買票
Thread-0: 購票成功
Thread-7: 購票成功
Thread-5: 購票成功
Thread-6: 購票成功
View Code

從結果來看,最多只有5個線程在購票。而這麽精確的控制,我們也只是調用了acquire和release方法。下面看看是如何實現的。

從acquire方法進去,又可以看到老套路:具體調用的還是AbstractQueuedSynchronizer這個類的邏輯

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
而tryAcquireShared方法留給了子類去實現,Semaphore類裏面的兩個內部類FairSync和NonfairSync都繼承自AbstractQueuedSynchronizer。這兩個內部類,從名字來看,一個實現了公平鎖,另一個是非公平鎖。這裏多說一句,所謂公平和非公平是這個意思:假設現在有一個線程A在等待獲取鎖,這時候又來了線程B,如果這個時候B不考慮A的感受,也去申請鎖,顯然不公平;反之,只要A是先來的,B一定要排在A的後面,不能馬上去申請鎖,就是公平的。
Semaphore默認是調用了NonfairSync的tryAcquireShared方法,主要邏輯:
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

這又是一個經典的CAS操作加無限循環的算法,用來保證共享變量的正確性。另外,此處的getState()方法很是迷惑人,你以為是獲取狀態,實則不然。我們先看看Semaphore的構造方法:

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
        // 內部類
        NonfairSync(int permits) {
            super(permits);
        }
        // 內部類,NonfairSync的父類
        Sync(int permits) {
            setState(permits);
        }

我們傳進去的參數5,最終傳給了setState方法,而getState和setState方法都在AbstractQueuedSynchronizer類裏面

    /**
     * The synchronization state.
     */
    private volatile int state;

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

也就是說父類定義了一個屬性state,並配有final的get和set方法,子類只需要繼承該屬性,想代表什麽含義都可以,比如Semaphore裏面的內部類Sync就把這個屬性當作最大允許訪問的permits,像CountDownLatch和CyclicBarrier都是這麽幹的。這種方式似乎不太好理解,為什麽不是每個子類都定義自己的具有明確語義的屬性,而是把控制權放在父類???我猜是出於安全的考慮。反正,大師的思考深度,我們揣摩不了。

再回到tryAcquireShared方法,這個方法是有參數的---int型的acquires,代表你要一次占幾個坑。我們調用的無參的acquire方法,默認是傳入1作為參數調用的這個方法,一次只申請一個坑。但是有的情況下,你可能一次需要多個,比如高富帥需要同時交多個女朋友。方法的返回值是剩余的坑的數量,如果數量小於0,執行AbstractQueuedSynchronizer這個類的doAcquireSharedInterruptibly方法。

    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這個方法的邏輯與獨占模式下的邏輯差不多,可以看看之前講Condition的那篇,出門一路左拐。當所有的坑都被占著的時候,再來的線程都會被封裝成節點,添加到等待的隊列裏面去。不同的是,這裏的節點都是共享模式,而共享模式是實現多個坑同時提供服務的核心。

再來看看坑的釋放,從release方法進去,核心邏輯在tryReleaseShared方法:

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

CAS、無限循環,熟悉的配方,熟悉的味道。同獲取一樣,這裏也可以一次釋放多個坑。然而,這裏考慮到了next小於current的情況,我是絞盡腦汁也沒想出來。傳進來的releases一般都是大於0的整數(大部分情況下就是1),最終還是會造成next小於current,實在是想不出來,而且還是拋出Error。但是這種情況,通過代碼可以精確的再現。好吧,是在下輸了。如果讀者中有高人,請指點一二,不勝感激!!!

前面這麽多都只是分析了非公平模式下的處理邏輯,而公平模式下的邏輯多了一個判斷,就是看看前面還有沒有線程在等待(節點有沒有前驅)。具體的細節,希望讀者自己玩味。

最後總結一下:所有的並發核心控制邏輯都在AbstractQueuedSynchronizer這個類中,只有理解了這個類的設計思路,才能真正理解衍生出來的工具類的實現原理。

 

Semaphore實現原理分析