1. 程式人生 > >Java併發-深入理解Semaphore(訊號量)之原始碼解析

Java併發-深入理解Semaphore(訊號量)之原始碼解析

深入理解Semaphore(訊號量)

Semaphore藉助AQS

  • Sync 繼承 AbstractQueuedSynchronizer(AQS同步器)
  • NonfairSync Sync的非公平實現
  • FairSync Sync的公平實現

為什麼沒有實現Lock介面?

  • 因為,lock和unlock沒有引數,無法達到此效果

套路(同ReentrantLock)

  • AQS操作,是整合AQS類作為靜態內部類
  • Sync預設是非公平的實現
  • NonfairSync 非公平同步器
  • FairSync 公平同步器

Semaphore的特性

  • Semaphore是一個共享同步佇列
  • Semaphore主要是控住同一時間執行的執行緒數
  • state標識當前執行的執行緒數
  • 是可以響應中斷的

原始碼分析

構造

Semaphore構造

    //預設識非公平同步佇列
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

Semaphore.Sync構造

        //permits表示,限制限號量,此時state==permits
        Sync(int permits) {
            setState(permits);
        }

aquire()方法

    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        //呼叫AQS的acquireSharedInterruptibly
        sync.acquireSharedInterruptibly(permits);
    }
     
    //AQS
    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 這裡呼叫Semaphore的tryAcquireShared方法,如果<0,標識不能獲取該資源鎖,就會入隊
            if (tryAcquireShared(arg) < 0)
                //可以被中斷的入隊操作
                doAcquireSharedInterruptibly(arg);
    }
    //非公平的tryAcquireShared方法呼叫父類的nonfairTryAcquireShared方法
    final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    //這裡remaining標識剩餘的訊號量
                    //remaining小於0會直接返回,在AQS只有>0的返回值才能獲取資源鎖
                    if (remaining < 0 ||
                        //CAS操作,對state進行賦值,成功設定state,此時訊號量>0,即可獲取資源鎖
                        compareAndSetState(available, remaining))
                        return remaining;
                }
    }
    
    //公平同步器的tryAcquireShared方法
    protected int tryAcquireShared(int acquires) {
                for (;;) {
                    //這裡判斷是否存在前繼節點,存在就不可獲取資源鎖,能獲取資源鎖的之能事head節點
                    if (hasQueuedPredecessors())
                        return -1;
                    //同上操作
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
    }
    

release()方法

    //獲取的時候,傳入的訊號值必須和釋放的訊號值相匹配
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
    
    public final boolean releaseShared(int arg) {
        //首先呼叫tryReleaseShared,該操作是一個資源操作,只可能返回true
        if (tryReleaseShared(arg)) {
        //進行釋放,其實就是講該節點從同步佇列中移除,更新新的head指向的node引用
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    //Syn統一實現了該方法,釋放可以公用的
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                //CAS操作,更新state的值
                if (compareAndSetState(current, next))
                    return true;
            }
        }

Demo

總會是maxCount個執行緒一起執行,這裡訊號量規定了5個,而每次執行緒需要訊號為1,那麼就表示,該訊號量支援5
個執行緒同時執行任務。也支援每個執行緒需要的訊號不同,資源一定的情況下,每個執行緒消耗不一樣,維持系統穩定。

/**
 * describe:
 * E-mail:[email protected]  date:2018/12/17
 *
 * @Since 0.0.1
 */
public class SemaphoreTest {
    //訊號量的permits
    private final int maxCount;
    private final Semaphore semaphore;

    public SemaphoreTest(int maxCount) {
        this.maxCount = maxCount;
        //預設是nofair
        this.semaphore = new Semaphore(maxCount);
    }

    public void add() {
        try {
            //獲取到訊號量
            semaphore.acquire();
            //操作
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
    }


}

class A extends Thread {
    SemaphoreTest semaphoreTest;

    public A(SemaphoreTest semaphoreTest) {
        this.semaphoreTest = semaphoreTest;
    }

    @Override
    public void run() {
        semaphoreTest.add();
        System.out.println("add:");
    }
}



class Test {
    public static void main(String[] args) {
        SemaphoreTest semaphoreTest = new SemaphoreTest(5);
        for (int i = 0; i < 10; i++) {
            A a = new A(semaphoreTest);
            a.start();
        }
    }
}

GitHub主頁