1. 程式人生 > >【搞定Java併發程式設計】第26篇:Java中的併發工具類之控制併發執行緒數的 Semaphore

【搞定Java併發程式設計】第26篇:Java中的併發工具類之控制併發執行緒數的 Semaphore

上一篇:Java中的併發工具類之同步屏障 CyclicBarrier

本文目錄:

1、獲取許可證

2、釋放許可證


本文轉載自:https://mp.weixin.qq.com/s/LS8YBKpiJnHEY1kMWmwoxg

推薦閱讀:剖析基於併發AQS的共享鎖的實現(基於訊號量Semaphore)【寫的非常好】

Semphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源。

Semaphore是JUC包中比較常用到的一個類,它是AQS共享模式的一個應用,可以允許多個執行緒同時對共享資源進行操作,並且可以有效的控制併發數,利用它可以很好的實現流量控制。

Semaphore提供了一個許可證的概念,可以把這個許可證看作公共汽車車票,只有成功獲取車票的人才能夠上車,並且車票是有一定數量的,不可能毫無限制的發下去,這樣就會導致公交車超載。所以當車票發完的時候(公交車以滿載),其他人就只能等下一趟車了。如果中途有人下車,那麼他的位置將會空閒出來,因此如果這時其他人想要上車的話就又可以獲得車票了。

利用Semaphore可以實現各種池,我們在本篇末尾將會動手寫一個簡易的資料庫連線池。

我們先看下Semaphore這個類的整體內部結構:

public class Semaphore implements java.io.Serializable {
    
        private final Sync sync;

        abstract static class Sync extends AbstractQueuedSynchronizer {...}
	
	static final class NonfairSync extends Sync {...}

        static final class FairSync extends Sync {...}
	
	public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
	
	public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
	
	// 獲取許可證
	public void acquire() throws InterruptedException {...}
	
	// 嘗試獲取許可證
	public boolean tryAcquire() {...}
	
	// 歸還許可證
	public void release() {...}
	
	// 獲取permits個許可證
	public void acquire(int permits) throws InterruptedException {...}
	
	// 嘗試獲取permits個許可證
	public boolean tryAcquire(int permits) {...}
	
	// 歸還permits個許可證
	public void release(int permits) {...}
	
	// 返回此訊號量中當前可用的許可證數
	public int availablePermits() {...}
	
	public int drainPermits() {...}
	
	// 減少reduction個許可證
	protected void reducePermits(int reduction) {...}
	 
	// 是否有執行緒正在等待許可證 
	public final boolean hasQueuedThreads() {...}
	
	// 返回正在等待許可證的執行緒數
	public final int getQueueLength() {...}
	
	// 返回所有等待獲取許可證的執行緒集合
	protected Collection<Thread> getQueuedThreads() {...}
}

可以得出下面關於 Semaphore 的類關係圖:

從上面的Semaphore類結構中可用發現它其實也是 AQS 中共享鎖的使用,因為每個執行緒共享一個池嘛。

套路解讀:建立 Semaphore 例項的時候,需要一個引數 permits,這個基本上可以確定是設定給 AQS 的 state 的,然後每個執行緒呼叫 acquire 的時候,執行 state = state - 1,release 的時候執行 state = state + 1,當然,acquire 的時候,如果 state = 0,說明沒有資源了,需要等待其他執行緒 release。

首先我們來看一下Semaphore的構造器:

// 構造器1
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

// 構造器2
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore提供了兩個帶參構造器,沒有提供無參構造器。這兩個構造器都必須傳入一個初始的許可證數量,使用構造器1構造出來的訊號量在獲取許可證時會採用非公平方式獲取,使用構造器2可以通過引數指定獲取許可證的方式(公平or非公平)。

Semaphore主要對外提供了兩類API,獲取許可證釋放許可證,預設的是獲取和釋放一個許可證,也可以傳入引數來同時獲取和釋放多個許可證。在本篇中我們只講每次獲取和釋放一個許可證的情況。

1、獲取許可證

// 獲取一個許可證(響應中斷)
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// 獲取一個許可證(不響應中斷)
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

// 嘗試獲取許可證(非公平獲取)
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

// 嘗試獲取許可證(定時獲取)
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

上面的API是Semaphore提供的預設獲取許可證操作。每次只獲取一個許可證,這也是現實生活中較常遇到的情況。除了直接獲取還提供了嘗試獲取,直接獲取操作在失敗之後可能會阻塞執行緒,而嘗試獲取則不會。另外還需注意的是tryAcquire方法是使用非公平方式嘗試獲取的。在平時我們比較常用到的是acquire方法去獲取許可證。下面我們就來看看它是怎樣獲取的。可以看到acquire方法裡面直接就是呼叫sync.acquireSharedInterruptibly,這個方法是AQS裡面的方法,我們簡單講一下。

// 以可中斷模式獲取鎖(共享模式)
public final void acquireSharedInterruptibly(int arg) 
    throws InterruptedException {
    // 首先判斷執行緒是否中斷, 如果是則丟擲異常
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
    // 1.嘗試去獲取鎖
    if (tryAcquireShared(arg) < 0) {
        // 2. 如果獲取失敗則進人該方法
        doAcquireSharedInterruptibly(arg);
    }
} 

acquireSharedInterruptibly方法首先就是去呼叫tryAcquireShared方法去嘗試獲取,tryAcquireShared在AQS裡面是抽象方法,FairSync和NonfairSync這兩個派生類實現了該方法的邏輯。FairSync實現的是公平獲取的邏輯,而NonfairSync實現的非公平獲取的邏輯。

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 非公平方式嘗試獲取
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            // 獲取可用許可證
            int available = getState();
            // 獲取剩餘許可證
            int remaining = available - acquires;
            // 1.如果remaining小於0則直接返回remaining
            // 2.如果remaining大於0則先更新同步狀態再返回remaining
            if (remaining < 0 || compareAndSetState(available, remaining)) {
                return remaining;
            }
        }
    }
}

// 非公平同步器
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    // 嘗試獲取許可證
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

// 公平同步器
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    // 嘗試獲取許可證
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 判斷同步佇列前面有沒有人排隊
            if (hasQueuedPredecessors()) {
                // 如果有的話就直接返回-1,表示嘗試獲取失敗
                return -1;
            }
            // 獲取可用許可證
            int available = getState();
            // 獲取剩餘許可證
            int remaining = available - acquires;
            // 1.如果remaining小於0則直接返回remaining
            // 2.如果remaining大於0則先更新同步狀態再返回remaining
            if (remaining < 0 || compareAndSetState(available, remaining)) {
                return remaining;
            }
        }
    }
}

裡需要注意的是NonfairSync的tryAcquireShared方法直接呼叫的是nonfairTryAcquireShared方法,這個方法是在父類Sync裡面的。非公平獲取鎖的邏輯是先取出當前同步狀態(同步狀態表示許可證個數),將當前同步狀態減去傳入的引數,如果結果不小於0的話證明還有可用的許可證,那麼就直接使用CAS操作更新同步狀態的值,最後不管結果是否小於0都會返回該結果值。

這裡我們要了解tryAcquireShared方法返回值的含義,返回負數表示獲取失敗,零表示當前執行緒獲取成功但後續執行緒不能再獲取,正數表示當前執行緒獲取成功並且後續執行緒也能夠獲取。我們再來看acquireSharedInterruptibly方法的程式碼。

// 以可中斷模式獲取鎖(共享模式)
public final void acquireSharedInterruptibly(int arg) 
    throws InterruptedException {
    // 首先判斷執行緒是否中斷, 如果是則丟擲異常
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
    // 1.嘗試去獲取鎖
    // 負數:表示獲取失敗
    // 零值:表示當前執行緒獲取成功, 但是後繼執行緒不能再獲取了
    // 正數:表示當前執行緒獲取成功, 並且後繼執行緒同樣可以獲取成功
    if (tryAcquireShared(arg) < 0) {
        // 2. 如果獲取失敗則進人該方法
        doAcquireSharedInterruptibly(arg);
    }
}

如果返回的remaining小於0的話就代表獲取失敗,因此tryAcquireShared(arg) < 0就為true,所以接下來就會呼叫doAcquireSharedInterruptibly方法,這個方法我們在講AQS的時候講過,它會將當前執行緒包裝成結點放入同步佇列尾部,並且有可能掛起執行緒。這也是當remaining小於0時執行緒會排隊阻塞的原因。

而如果返回的remaining>=0的話就代表當前執行緒獲取成功,因此tryAcquireShared(arg) < 0就為flase,所以就不會再去呼叫doAcquireSharedInterruptibly方法阻塞當前執行緒了。

以上是非公平獲取的整個邏輯,而公平獲取時僅僅是在此之前先去呼叫hasQueuedPredecessors方法判斷同步佇列是否有人在排隊,如果有的話就直接return -1表示獲取失敗,否則才繼續執行下面和非公平獲取一樣的步驟。

2、釋放許可證

// 釋放一個許可證
public void release() {
    sync.releaseShared(1);
}

呼叫release方法是釋放一個許可證,它的操作很簡單,就呼叫了AQS的releaseShared方法,我們來看看這個方法。

// 釋放鎖的操作(共享模式)
public final boolean releaseShared(int arg) {
    // 1.嘗試去釋放鎖
    if (tryReleaseShared(arg)) {
        // 2.如果釋放成功就喚醒其他執行緒
        doReleaseShared();
        return true;
    }
    return false;
}

AQS的releaseShared方法首先呼叫tryReleaseShared方法嘗試釋放鎖,這個方法的實現邏輯在子類Sync裡面。

abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
    // 嘗試釋放操作
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // 獲取當前同步狀態
            int current = getState();
            // 將當前同步狀態加上傳入的引數
            int next = current + releases;
            // 如果相加結果小於當前同步狀態的話就報錯
            if (next < current) {
                throw new Error("Maximum permit count exceeded");
            }
            // 以CAS方式更新同步狀態的值, 更新成功則返回true, 否則繼續迴圈
            if (compareAndSetState(current, next)) {
                return true;
            }
        }
    }
    ...
}

可以看到tryReleaseShared方法裡面採用for迴圈進行自旋,首先獲取同步狀態,將同步狀態加上傳入的引數,然後以CAS方式更新同步狀態,更新成功就返回true並跳出方法,否則就繼續迴圈直到成功為止,這就是Semaphore釋放許可證的流程。


上一篇:Java中的併發工具類之同步屏障 CyclicBarrier