1. 程式人生 > >Dubbo原始碼分析:RPC協議實現-服務端併發控制與Semaphore訊號量

Dubbo原始碼分析:RPC協議實現-服務端併發控制與Semaphore訊號量

概述

Dubbo支援在服務端通過在service或者method,通過executes引數設定每個方法,允許併發呼叫的最大執行緒數,即在任何時刻,只允許executes個執行緒同時呼叫該方法,超過的則拋異常返回,從而對提供者服務進行併發控制,保護資源。

用法

服務級別

限制 com.foo.BarService 的每個方法,伺服器端併發執行(或佔用執行緒池執行緒數)不能超過 10 個:
<dubbo:service interface=“com.foo.BarService” executes=“10” />

方法級別

限制 com.foo.BarService 的 sayHello 方法,伺服器端併發執行(或佔用執行緒池執行緒數)不能超過 10 個:
<dubbo:service interface=“com.foo.BarService”>
<dubbo:method name=“sayHello” executes=“10” />
</dubbo:service>

原始碼實現

  • Java Semaphore:訊號量
    • 定義:Java訊號量類似於一個計數器,用於限制在任何時刻,只允許給定個執行緒對某個共享資源的訪問。
    • 用法:
  1. 獲取訊號量:每個執行緒通過執行tryAcquire(非阻塞)或者acquire(阻塞,可中斷)獲取一個訊號量或者說是通行證,同時將訊號總量減一,當數量變為0時,則後面來的執行緒獲取則返回false或者阻塞;
  2. 釋放訊號量:執行緒對併發資源訪問完畢之後,通過呼叫relase方法將訊號總量加1,允許一個執行緒訪問該共享資源;
  • ExecuteLimitFilter:執行緒併發控制過濾器
    在這裡插入圖片描述
  1. 服務端每個執行緒處理RPC請求,進行provider方法呼叫時,該執行緒執行ExecuteLimitFilter的invoke方法,定義Semaphore型別的executeLimit的區域性變數,該區域性變數的主要作用是用於引用與該provider方法繫結的RpcStatus,該RpcStatus由所有呼叫該provider方法的執行緒共享;
  2. 呼叫count.getSemaphore(max)獲取該provider方法(即共享資源)的訊號量:
    在這裡插入圖片描述
  • 通過synchronized和double check來保證訊號量只初始化一次或者當前呼叫的url指定的executes數量變化時,更新。注意executesLimit為RpcStatus的例項的field,而單個provider的方法所繫結的rpcStatus是又所有呼叫這個方法的執行緒共享的,故為了保證該executesLimit對其他執行緒的可見性,從而在其中一個執行緒初始化之後,其他執行緒在進入synchronized塊,再次判斷executeLimit是否為null時,感知其他執行緒已經初始化過了,需要將executesLimit設定為volatile:
    在這裡插入圖片描述
  • 呼叫Semaphore的tryAcquire獲取一個訊號量,如果獲取到則進行往下執行進行provider的方法呼叫,否則拋異常返回。
    在這裡插入圖片描述
    其中tryAcquire為非阻塞的,直接返回true或者false,當前執行緒進行往下執行,需要在程式中控制是否獲取成功的處理,並且是unfair的。
    在這裡插入圖片描述