1. 程式人生 > >淺析Java併發工具類Semaphore

淺析Java併發工具類Semaphore

淺析Java併發工具類Semaphore

1. 概述

       Semaphore類表示訊號量。Semaphore內部主要通過AQS(AbstractQueuedSynchronizer)實現執行緒的管理。執行緒在執行時首先獲取許可,如果成功,許可數就減1,執行緒執行,當執行緒執行結束就釋放許可,許可數就加1。如果許可數為0,則獲取失敗,執行緒位於AQS的等待佇列中,它會被其它釋放許可的執行緒喚醒。

在建立Semaphore物件的時候還可以指定它的公平性。

  • 非公平訊號量是指在嘗試獲取許可時,不必關心是否還有需要獲取許可的執行緒位於等待佇列中。
  • 公平的訊號量在獲取許可時首先要檢視等待佇列中是否已有執行緒,如果有則不能進行獲取(即保證了FCFS)。

一般常用非公平的訊號量,可以避免執行緒被CPU選中執行而又被加入等待佇列,提高了效率。

2. 原始碼分析

Semaphore有兩個建構函式:

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    
public Semaphore(int permits,
boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }

permits表示許可數,它最後傳遞給了AQS的state值。

        Sync(int permits) {
            setState(permits);
        }

fair即為表示執行緒獲取訊號量是否保證公平性。

可以看到在等待佇列中若有執行緒在等待,FairSync是不會獲取訊號量的。

    static final class
FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { // The point if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }

而NonfairSync則不然:

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

這就驗證了上文於公平性的討論。

3. 一個例子

public class SemaphoreDemo {
	private static Semaphore semp = new Semaphore(5);

	public static void main(String[] args) {
		ExecutorService exec = Executors.newCachedThreadPool();
		for (int index = 0; index < 20; index++) {
			final int id = index;
			Runnable task = new Runnable() {
				public void run() {
					try {
						// 獲取許可
						semp.acquire();
						System.out.println(Thread.currentThread().getName() + " Accessing: " + id + " " + " 當前 等待佇列大小:"
								+ semp.getQueueLength());
						Thread.sleep((long) (Math.random() * 3000));
						semp.release();
					} catch (InterruptedException e) {
					}
				}
			};
			exec.execute(task);
		}
		exec.shutdown();
	}

}

需要注意的一點是,訊號量可以由一個執行緒使用,然後由另一個執行緒來進行釋放,如下所示:

public class UseSemaphore {
	private static Semaphore semp = new Semaphore(5);

	public static void main(String[] args) throws InterruptedException {
		ExecutorService exec = Executors.newCachedThreadPool();
		for (int index = 0; index < 20; index++) {
			final int id = index;
			Runnable task = new Runnable() {
				public void run() {
					try {
						// 獲取許可
						semp.acquire();
						System.out.println(Thread.currentThread().getName() + " Accessing: " + id + " " + " 當前 等待佇列大小:"
								+ semp.getQueueLength());
						// Thread.sleep((long) (Math.random() * 3000));
					} catch (InterruptedException e) {
					}
				}
			};
			exec.execute(task);
		}
		while (semp.availablePermits() != 5) {
			Thread.sleep(1000);
			semp.release();
		}
		exec.shutdown();
	}

}

4. 總結

Semaphore適合用來控制同時併發訪問數,通過設定合理的訊號量許可數就可以進行有效的控制客戶端同時訪問的數目。