1. 程式人生 > >Dubbo學習筆記10:Dubbo服務消費方啟動流程源碼分析

Dubbo學習筆記10:Dubbo服務消費方啟動流程源碼分析

exec checked 自己 當前 In rpc mod png collect

同理我們看下服務消費端啟動流程時序圖:

技術分享圖片

在《Dubbo整體架構分析》一文中,我們提到服務消費方需要使用ReferenceConfig API來消費服務,具體是調用代碼(1)get()方法來生成遠程調用代理類。get()方法最終會調用createProxy方法來具體創建代理類,其中createProxy結合時序圖的核心代碼如下:

@SuppressWarnings({"unchecked" , "rawtypes" , "deprecation"})
private T createProxy(Map<String , String> map){
    ...
    if(isJvmRefer){
        ...
    }
else{ ... // (1) 當只配置一個服務中心的時候 if(urls.size() == 1){ invoker = refprotocol.refer(interfaceClass , urls.get(0)); }else{ // (2) 多個服務中心的時候 ... } } ... // (3) 創建代理服務 return (T)proxyFactory.getProxy(invoker); }

其中refprotocol的定義如下:

private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

可知refprotocol是Protocol擴展接口的適配器類,這裏調用refprotocol.refer(interfaceClass , urls.get(0));實際調用的是Protocol$Adaptive的refer方法。從Protocol$Adaptive的refer方法內部,我們可以發現當前協議類型為registry,也就是這裏需要調用RegistryProtocol的refer方法,但是RegistryProtocol被QosProtocolWrapper / ProtocolFilterWrapper / ProtocolListenerWrapper三個wrapper類增強了。所以這裏經過一層層調用後,最後調用到了RegistryProtocol的refer方法,其內部主要是調用了doRefer方法,doRefer代碼如下:

private <T> Invoker<T> doRefer(Cluster cluster , Registry registry , Class<T> type , URL url){
    ...
    // (3)
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY , Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY));
    // (4)
    return cluster.join(directory);
}

如上代碼(3)的作用是向服務註冊中心訂閱服務提供者的服務,代碼(4)則是調用擴展接口Cluster的適配器類的join方法,根據參數選擇配置的集群容錯策略。這裏我們先講講代碼(3)的邏輯,看看如何把服務消費方遠程服務轉換到Invoker,這裏結合Zookeeper作為服務治理中心來講解,首先看看時序圖:

技術分享圖片

如上時序圖步驟(2)(3)(4),從Zookeeper獲取服務提供者的地址列表,等Zookeeper返回地址列表後會調用RegistryDirectory的notify方法,代碼(6)(7)(8)根據獲取的最新的服務提供者url地址轉換為具體的invoker列表,也就是每個提供者的url會被轉換為一個Invoker對象,具體轉換在toInvokers方法中進行:

private Map<String , Invoker<T>> toInvokers(List<URL> urls){
    Map<String , Invoker<T>> newUrlInvokerMap = new HashMap<String , Invoker<T>>();
    ...
    for(URL providerUrl : urls){
        ...
        Map<String , Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;    //
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        if(invoker == null){
            try{
                ...
                if(enabled){
                    // 這裏具體調用dubbo協議轉換服務為invoker
                    invoker = new InvokerDelegete<T>(protocol.refer(serviceType , url) , url , providerUrl);
                }
            }catch(Throwable t){
                
            }
            if(invoker != null){
                newUrlInvokerMap.put(key , invoker);
            }
        }else{
            newUrlInvokerMap.put(key , invoker);
        }
    }
    return newUrlInvokerMap;
}

如上代碼,具體轉換服務到invoker對象是通過調用protocol.refer(serviceType , url)來完成的,這裏的protocol對象也是Protocol擴展接口的適配器對象,所以調用protocol.refer實際是調用Protocol$Adaptive的refer方法。url中協議默認為dubbo,所以適配器裏調用的應該是DubboProtocol的refer方法。

前面章節也講過,Dubbo默認提供了一系列Wrapper類對擴展實現類進行了功能增強,當然這裏也不例外,Dubbo使用了ProtocolListenerWrapper / ProtocolFilterWrapper等類對DubboProtocol進行了功能增強。所以這裏經過一次次調用後才調用到DubboProtocol的refer方法,DubboProtocol的refer代碼內容如下:

public <T> Invoker<T> refer(Class<T> serviceType , URL url) throws RpcException{
    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType , url ,getClients(url) , invokers);
    invokers.add(invoker);
    return invoker;
}

如上代碼,首先getClients方法從(12)到(18)創建服務消費端的NettyClient對象用來連接服務提供者。另外可知refer方法內部返回了一個DubboInvoker,這個就是原生的invoker對象,服務方遠程服務轉換就是為了這個invoker。代碼(12)則對這個invoker進行裝飾,使用一系列filter形成了責任鏈,invoker被放到責任鏈的末尾,下面看看buildInvokerChain如何形成責任鏈,代碼如下:

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker , String key , String group){
    Invoker<T> last = invoker;
    // 獲取所有激活的filter,然後使用鏈表方式形成責任鏈
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl() , key , group);
    if(filters.size() > 0){
        for(int i=filters.size()-1 ; i>=0 ; i--){
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>(){
                public Class<T> getInterface(){
                    return invoker.getInterface();
                }
                public URL getUrl(){
                    return invoker.getUrl();
                }
                public boolean isAvailable(){
                    return invoker.isAvailable();
                }
                public Result invoke(Invocation invocation) throws RpcException{
                    return filter.invoke(next , invocation);    
                }
                public void destroy(){
                    invoker.destroy();
                }
                @Override
                public String toString(){
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}

其中擴展接口Filter對應的實現類,如下所示:

技術分享圖片

其中MonitorFilter和監控中心進行交互,FutureFilter用來實現異步調用,GenericFilter用來實現泛化調用,ActiveLimitFilter用來控制消費端最大並發調用量,ExecuteLimitFilter用來控制服務提供方最大並發處理量等,當然你可以寫自己的filter。由於是責任鏈,所以ProtocolFilterWrapper的refer返回的是責任鏈頭部的filter到ProtocolListenerWrapper,而ProtocolListenerWrapper的refer方法內容如下:

public <T> Invoker<T> refer(Class<T> type , URL url) throws RpcException{
    ...
    return new ListenerInvokerWrapper<T>(protocol.refer(type,url) , Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(InvokerListener.class).getActivateExtension(url , Constants.INVOKER_LISTENER_KEY)));
}

ProtocolListenerWrapper的refer返回的是ListenerInvokerWrapper對象,所以代碼(9)返回的也是url對應的ListenerInvokerWrapper對象,然後再回到時序圖代碼(8)的toInvokers方法,可知toInvokers返回的是使用InvokerDelegete對ListenerInvokerWrapper包裹的對象。到這裏RegistryDirectory裏就維護了所有服務者的invoker列表,消費端發送信息時就是根據集群容錯和負載均衡算法從invoker列表裏選擇一個進行調用,當服務提供者掛了的時候,Zookeeper會通知更新這invoker列表。

到這裏我們講完了本節第一個時序圖中的步驟(3),下面我們接著講解步驟(4) 。

默認下步驟(4)會調用FailoverCluster的join方法,FailoverCluster的join方法代碼如下:

public class FailoverCluster implements Cluster{
    private static final String NAME = "failover";
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException{
        return new FailoverClusterInvoker<T>(directory);
    }
}

這裏把directory對象包裹到了FailoverClusterInvoker,這裏需要註意下directory就是上面講解的RegistryDirectory,其內部維護了所有服務提供者的invoker列表,而FailoverCluster就是集群容錯策略。

其實Dubbo對cluster擴展接口實現類使用wrapper類MockClusterWrapper進行增強,這個從下圖可以得到證明:

技術分享圖片

實際上的調用時序圖如下圖所示:

技術分享圖片

該時序圖中(3)返回了FailbackClusterInvoker對象到(2),下面看看MockClusterWrapper的代碼:

public class MockClusterWrapper implements Cluster{
    private Cluster cluster;
    public MockClusterWrapper(Cluster cluster){
        this.cluster = cluster;
    }
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException{
        return new MockClusterInvoker<T>(directory , this.cluster.join(directory));
    }
}

可知MockClusterWrapper類把FailoverClusterInvoker包裝成了MockClusterInvoker實例,所以整個調用鏈最終調用返回的是MockClusterInvoker對象。也就是本文第一個時序圖步驟(4)返回的是MockClusterWrapper。然後執行代碼(13)獲取MockClusterInvoker的代理實現invoker到客戶端接口的轉換,這裏默認調用的是JavassistProxyFactory的getProxy方法,代碼如下:

public <T> T getProxy(Invoker<T> invoker , Class<?>[] interfaces){
    return (T)Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

其中InvokerInvocationHandler為具體攔截器。至此我們按照逆序的方式把服務消費端啟動流程講解完畢,下面在順過來講解一次遠程調用過程。

Dubbo服務消費端一次調用流程原理分析

同理先上時序圖:

技術分享圖片

由於服務消費端通過ReferenceConfig的get()方法返回的是一個代理類,並且方法攔截器為InvokerInvocationHandler,所以當調用了服務的方法後會被InvokerInvocationHandler攔截,執行如上時序圖流程。

如上流程,首先步驟(1)(2)(3)調用了集群容錯策略FailoverClusterInvoker,其內部首先根據設置的負載均衡策略選擇一個invoker作為FailoverClusterInvoker具體的遠程調用者,如果調用發生異常,則根據FailoverClusterInvoker的策略重新選擇一個invoker進行調用。

FailoverClusterInvoker內每次調用具體invoker的invoke方法都會走到步驟(8)(9),然後步驟(10)(11)(12)是filter內創建的責任鏈,最後調用了原生的DubboInvoker,具體使用nettyclient與服務提供者進行交互。

  

Dubbo學習筆記10:Dubbo服務消費方啟動流程源碼分析