1. 程式人生 > >原始碼分析Dubbo服務提供者、服務消費者併發度控制機制

原始碼分析Dubbo服務提供者、服務消費者併發度控制機制

   本文將詳細分析< dubbo:service executes=”“/>與< dubbo:reference actives = “”/>的實現機制,深入探討Dubbo自身的保護機制。 1、原始碼分析ExecuteLimitFilter @Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY )

  • 過濾器作用    服務呼叫方併發度控制。
  • 使用場景    對Dubbo服務提供者實現的一種保護機制,控制每個服務的最大併發度。
  • 阻斷條件    當服務呼叫超過允許的併發度後,直接丟擲RpcException異常。    接下來原始碼分析ExecuteLimitFilter的實現細節。    ExecuteLimitFilter#invoke
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0
); // @1 if (max > 0) { RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); // @2 executesLimit = count.getSemaphore(max); // @3 if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { // @4
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); } } boolean isSuccess = true; try { Result result = invoker.invoke(invocation); // @5 return result; } catch (Throwable t) { isSuccess = false; if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } finally { if(acquireResult) { // @6 executesLimit.release(); } } }

   程式碼@1:從服務提供者列表中獲取引數executes的值,如果該值小於等於0,表示不啟用併發度控制,直接沿著呼叫鏈進行呼叫。    程式碼@2:根據服務提供者url和服務呼叫方法名,獲取RpcStatus。

public static RpcStatus getStatus(URL url, String methodName) {
        String uri = url.toIdentityString();      
        ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);         
        if (map == null) {
            METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());    
            map = METHOD_STATISTICS.get(uri);
        }
        RpcStatus status = map.get(methodName);          /
        if (status == null) {
            map.putIfAbsent(methodName, new RpcStatus());
            status = map.get(methodName);
        }
        return status;
    }

   這裡是併發容器ConcurrentHashMap的經典使用,從 這裡可以看出ConcurrentMap< String, ConcurrentMap< String, RpcStatus>> METHOD_STATISTICS的儲存結構為 { 服務提供者URL唯一字串:{方法名:RpcStatus} }。    程式碼@3:根據服務提供者配置的最大併發度,建立該服務該方法對應的訊號量物件。

public Semaphore getSemaphore(int maxThreadNum) {
        if(maxThreadNum <= 0) {
            return null;
        }
        if (executesLimit == null || executesPermits != maxThreadNum) {
            synchronized (this) {
                if (executesLimit == null || executesPermits != maxThreadNum) {
                    executesLimit = new Semaphore(maxThreadNum);
                    executesPermits = maxThreadNum;
                }
            }
        }
        return executesLimit;
    }

   使用了雙重檢測來建立executesLimit 訊號量。    程式碼@4:如果獲取不到鎖,並不會阻塞等待,而是直接丟擲RpcException,服務端的策略是快速丟擲異常,供服務呼叫方(消費者)根據叢集策略進行執行,例如重試其他服務提供者。    程式碼@5:執行真實的服務呼叫。    程式碼@6:如果成功申請到訊號量,在服務呼叫結束後,釋放訊號量。    總結:< dubbo:service executes=”“/>的含義是,針對每個服務每個方法的最大併發度。如果超過該值,則直接丟擲RpcException。

   2、原始碼分析ActiveLimitFilter    @Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY )

  • 過濾器作用    消費端呼叫服務的併發控制。
  • 使用場景    控制同一個消費端對服務端某一服務的併發呼叫度,通常該值應該小於< dubbo:service executes=”“/>
  • 阻斷條件    非阻斷,但如果超過允許的併發度會阻塞,超過超時時間後將不再呼叫服務,而是直接丟擲超時。

   原始碼分析ActiveLimitFilter的實現原理:    ActiveLimitFilter#invoke

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);    // @1
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());           // @2
        if (max > 0) {                                          
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);   // @3
            long start = System.currentTimeMillis();
            long remain = timeout;
            int active = count.getActive();                                                                                                                                          // @4
            if (active >= max) {                                                                                                                                                          // @5
                synchronized (count) {                                                                                                                                                                      
                    while ((active = count.getActive()) >= max) {                                                                                                     
                        try {
                            count.wait(remain);                                                                                                                                      
                        } catch (InterruptedException e) {
                        }
                        long elapsed = System.currentTimeMillis() - start;                               
                        remain = timeout - elapsed;
                        if (remain <= 0) {                                                                                                                                             // @6
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                    + invoker.getInterface().getName() + ", method: "
                                    + invocation.getMethodName() + ", elapsed: " + elapsed
                                    + ", timeout: " + timeout + ". concurrent invokes: " + active
                                    + ". max concurrent invoke limit: " + max);
                        }
                    }
                }
            }
        }
        try {
            long begin = System.currentTimeMillis();
            RpcStatus.beginCount(url, methodName);        // @7
            try {
                Result result = invoker.invoke(invocation);     // @8
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);    // @9
                return result;
            } catch (RuntimeException t) {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                throw t;
            }
        } finally {
            if (max > 0) {
                synchronized (count) {
                    count.notify();     // @10
                }
            }
        }
    }

   程式碼@1:從Invoker中獲取訊息端URL中的配置的actives引數,為什麼從Invoker中獲取的Url是消費端的Url呢?這是因為在消費端根據服務提供者URL建立呼叫Invoker時,會用服務提供者URL,然後合併消費端的配置屬性,其優先順序 -D > 消費端 > 服務端。其程式碼位於:、    RegistryDirectory#toInvokers    URL url = mergeUrl(providerUrl);    程式碼@2:根據服務提供者URL和呼叫服務提供者方法,獲取RpcStatus。    程式碼@3:獲取介面呼叫的超時時間,預設為1s。    程式碼@4:獲取當前消費者,針對特定服務,特定方法的併發呼叫度,active值。    程式碼@5:如果當前的併發 呼叫大於等於允許的最大值,則針對該RpcStatus申請鎖,並呼叫其wait(timeout)進行等待,也就是在介面呼叫超時時間內,還是未被喚醒,則直接丟擲超時異常。    程式碼@6:判斷被喚醒的原因是因為等待超時,還是由於呼叫結束,釋放了”名額“,如果是超時喚醒,則直接丟擲異常。    程式碼@7:在一次服務呼叫前,先將 服務名+方法名對應的RpcStatus的active加一。    程式碼@8:執行RPC服務呼叫。    程式碼@9:記錄成功呼叫或失敗呼叫,並將active減一。    程式碼@10:最終成功執行,如果開啟了actives機制(dubbo:referecnce actives=”“)時,喚醒等待者。    總結:< dubbo:reference actives=”“/> 是控制消費端對 單個服務提供者單個服務允許呼叫的最大併發度。該值的取值不應該大於< dubbo:service executes=”“/>的值,並且如果消費者機器的配置,如果效能不盡相同,不建議對該值進行設定。