1. 程式人生 > >JUC 中提供的限流利器-Semaphore(訊號量)

JUC 中提供的限流利器-Semaphore(訊號量)

在 JUC 包下,有一個 Semaphore 類,翻譯成訊號量,Semaphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源。Semaphore 跟鎖(synchronized、Lock)有點相似,不同的地方是,鎖同一時刻只允許一個執行緒訪問某一資源,而 Semaphore 則可以控制同一時刻多個執行緒訪問某一資源。 Semaphore(訊號量)並不是 Java 語言特有的,幾乎所有的併發語言都有。所以也就存在一個**訊號量模型**的概念,如下圖所示: ![訊號量模型](https://user-gold-cdn.xitu.io/2020/3/31/1712e2f202ff468e?w=455&h=341&f=png&s=11610) 訊號量模型比較簡單,可以概括為:**一個計數器、一個佇列、三個方法**。 計數器:記錄當前還可以執行多少個資源訪問資源。 佇列:待訪問資源的執行緒 **三個方法**: - **init()**:初始化計數器的值,可就是允許多少執行緒同時訪問資源。 - **up()**:計數器加1,有執行緒歸還資源時,如果計數器的值大於或者等於 0 時,從等待佇列中喚醒一個執行緒 - **down()**:計數器減 1,有執行緒佔用資源時,如果此時計數器的值小於 0 ,執行緒將被阻塞。 這三個方法都是原子性的,由實現方保證原子性。例如在 Java 語言中,JUC 包下的 Semaphore 實現了訊號量模型,所以 Semaphore 保證了這三個方法的原子性。 Semaphore 是基於 AbstractQueuedSynchronizer 介面實現訊號量模型的。AbstractQueuedSynchronizer 提供了一個基於 FIFO 佇列,可以用於構建鎖或者其他相關同步裝置的基礎框架,利用了一個 int 來表示狀態,通過類似 acquire 和 release 的方式來操縱狀態。關於 AbstractQueuedSynchronizer 更多的介紹,可以點選連結: > http://ifeve.com/introduce-abstractqueuedsynchronizer/ AbstractQueuedSynchronizer 在 Semaphore 類中的實現類如下: ```java abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } ``` 在 Semaphore 類中,實現了兩種訊號量:**公平的訊號量和非公平的訊號量**,公平的訊號量就是大家排好隊,先到先進,非公平的訊號量就是不一定先到先進,允許插隊。非公平的訊號量效率會高一些,所以預設使用的是非公平訊號量。具體的可以檢視 Semaphore 類實現原始碼。 Semaphore 類中,主要有以下方法: ```java // 構造方法,引數表示許可證數量,用來建立訊號量 public Semaphore(int permits); // 從訊號量中獲取許可,相當於獲取到執行權 public void acquire() throws InterruptedException; // 嘗試獲取1個許可,不管是否能夠獲取成功,都立即返回,true表示獲取成功,false表示獲取失敗 public boolean tryAcquire(); // 將許可還給訊號量 public void release(); ``` Semaphore 類的實現就瞭解的差不多了。可能你會有疑問 Semaphore 的應用場景是什麼?Semaphore 可以用來限流(流量控制),在一些公共資源有限的場景下,Semaphore 可以派上用場。比如在做日誌清洗時,可能有幾十個執行緒在併發清洗,但是將清洗的資料存入到資料庫時,可能只給資料庫分配了 10 個連線池,這樣兩邊的執行緒數就不對等了,我們必須保證同時只能有 10 個執行緒獲取資料庫連結,否則就會存在大量執行緒無法連結上資料庫。 用 Semaphore 訊號量來模擬這操作,程式碼如下: ```java public class SemaphoreDemo { /** * semaphore 訊號量,可以限流 * * 模擬併發資料庫操作,同時有三十個請求,但是系統每秒只能處理 5 個 */ private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors .newFixedThreadPool(THREAD_COUNT); // 初始化訊號量,個數為 5 private static Semaphore s = new Semaphore(5); public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { // 獲取許可 s.acquire(); System.out.println(Thread.currentThread().getName()+" 完成資料庫操作 ,"+ new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format( new Date())); // 休眠兩秒鐘,效果更直觀 Thread.sleep(2000); // 釋放許可 s.release(); } catch (InterruptedException e) { } } }); } // 關閉連線池 threadPool.shutdown(); } } ``` 執行效果如下: ![圖片描述](https://user-gold-cdn.xitu.io/2020/3/31/1712e2f203261093?w=674&h=468&f=png&s=94596) 從結果中,可以看出,每秒只有 5 個執行緒在執行,這符合我們的預期。 好了,關於 Semaphore 的內容就結束了,更加詳細的還請您查閱相關資料和閱讀 Semaphore 原始碼。希望這篇文章對您的學習或者工作有所幫助。 感謝您的閱讀,祝好。 ### 最後 > 目前網際網路上很多大佬都有 Semaphore(訊號量) 相關文章,如有雷同,請多多包涵了。原創不易,碼字不易,還希望大家多多支援。若文中有所錯誤之處,還望提出,謝謝。 ![網際網路平頭哥](https://user-gold-cdn.xitu.io/2020/2/1/16fffec0e10797f6?w=900&h=500&f=png&s