1. 程式人生 > >dubbo原始碼閱讀之服務目錄

dubbo原始碼閱讀之服務目錄

服務目錄

服務目錄對應的介面是Directory,這個接口裡主要的方法是

List<Invoker<T>> list(Invocation invocation) throws RpcException;

列出所有的Invoker,對於服務消費端而言,一個Invoker對應一個可用的服務提供者,底層封裝了一個tcp連線。當然Invoker也可以是巢狀的,一個Invoker內包含了多個實際的Invoker。通過Cluster物件將一個服務目錄封裝成一個Invoker,內部包含了故障轉移,服務路由,負載均衡,等等相關的叢集邏輯。
回到服務目錄,主要包括兩種服務目錄,StaticDirectory,RegistryDirectory。

  • StaticDirectory。靜態服務目錄,顧名思義,這個目錄在建立的時候就會通過構造方法傳進一個Invoker列表,在之後過程中這個列表不再變化。
  • RegistryDirectory。通過監聽註冊中心的服務提供者資訊動態更新Invoker列表的服務目錄。

從上節服務引入,我們知道,不論是StaticDirectory還是RegistryDirectory,最終都會通過Cluster.join方法封裝為一個Invoker。由於靜態服務目錄的邏輯很簡單,這裡不再贅述,本節我們主要分析一下注冊中心的服務目錄。

RegistryDirectory概述

這個類除了繼承了AbstractDirectory,還實現了NotifyListener介面。NotifyListener介面是一個監聽類,用於監聽註冊中心配置資訊的變更事件。我們首先簡單看一下RegistryDirectory中實現Directory介面的部分程式碼。

AbstractDirectory.list

list方法的實現放在抽象類AbstractDirectory中,

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }

    return doList(invocation);
}

wishing就是一個狀態的判斷。doList是一個模板方法,由子類實現。

RegistryDirectory.doList

@Override
public List<Invoker<T>> doList(Invocation invocation) {
    // 當狀態量forbidden為true時,服務呼叫被禁止
    // 什麼時候forbidden為true呢??當url只有一個,且協議名稱為empty時,就以為這沒有服務提供者可用。
    if (forbidden) {
        // 1. No service provider 2. Service providers are disabled
        throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
                getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
                NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
                ", please check status of providers(disabled, not registered or in blacklist).");
    }

    // 服務分組
    if (multiGroup) {
        return this.invokers == null ? Collections.emptyList() : this.invokers;
    }

    List<Invoker<T>> invokers = null;
    try {
        // Get invokers from cache, only runtime routers will be executed.
        // 從快取中取出Invoker列表,並經由服務路由獲取相應的Invoker
        invokers = routerChain.route(getConsumerUrl(), invocation);
    } catch (Throwable t) {
        logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
    }


    // FIXME Is there any need of failing back to Constants.ANY_VALUE or the first available method invokers when invokers is null?
    /*Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
    if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
        String methodName = RpcUtils.getMethodName(invocation);
        invokers = localMethodInvokerMap.get(methodName);
        if (invokers == null) {
            invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
        }
        if (invokers == null) {
            Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
            if (iterator.hasNext()) {
                invokers = iterator.next();
            }
        }
    }*/
    return invokers == null ? Collections.emptyList() : invokers;
}

這個方法的主要邏輯是,首先判斷服務是否可用(根據forbidden狀態變數)。然後從路由鏈中取出Invoker列表。由於服務路由並不是本節的重點,所以我們只是簡單第看一下RouterChain.route方法

RouterChain.route

public List<Invoker<T>> route(URL url, Invocation invocation) {
    List<Invoker<T>> finalInvokers = invokers;
    for (Router router : routers) {
        finalInvokers = router.route(finalInvokers, url, invocation);
    }
    return finalInvokers;
}

一次呼叫路由列表中的路由規則,最終返回經過多個路由規則路由過的Invoker列表。類似於責任鏈模式,有點像web容器的過濾器,或者是spring-mvc中的攔截器,都是一個鏈式的呼叫。
實際上我們平時一般較少使用到路由功能,所以這裡routers列表實際上是空的,這種情況下不用經過任何路由,直接原樣返回Invokers列表。而至於RouterChain內部的invokers成員是哪來的,RegistryDirectory監聽註冊中心發生變更後重新整理本地快取中的Invokers列表,並將其注入到RouterChain物件中,我們後面會講到。

RegistryDirectory.notify

接下來我們分析RegistryDirectory中最重要的方法,也就是監聽方法,用於監聽註冊中心的變更事件。

public synchronized void notify(List<URL> urls) {
    // 將監聽到的url分類,
    // 按照協議名稱或者category引數分為configurators,routers,providers三類
    Map<String, List<URL>> categoryUrls = urls.stream()
            .filter(Objects::nonNull)
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collect(Collectors.groupingBy(url -> {
                if (UrlUtils.isConfigurator(url)) {
                    return CONFIGURATORS_CATEGORY;
                } else if (UrlUtils.isRoute(url)) {
                    return ROUTERS_CATEGORY;
                } else if (UrlUtils.isProvider(url)) {
                    return PROVIDERS_CATEGORY;
                }
                return "";
            }));

    // 如果有變化的configurators類別的url,那麼將其轉化為引數並設到成員變數configurators
    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

    // 如果有變更的路由資訊url,那麼將其轉化為Router物件並覆蓋原先的路由資訊
    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    toRouters(routerURLs).ifPresent(this::addRouters);

    // providers
    // 最後處理最重要的服務提供者變更資訊,並用這些url重新整理當前快取的Invoker
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    refreshOverrideAndInvoker(providerURLs);
}

首先將從註冊中心獲取到的最新的url進行分類,根據協議名稱或者category引數將url分為三類:configurators, routers, providers,

  • configurators型別的url被轉換為Configurator列表,覆蓋本地快取
  • routers型別的url被轉換為Router列表,並被設定到routerChain物件中
  • providers型別的url則被用於接下來的建立Invoker

RegistryDirectory.refreshOverrideAndInvoker

private void refreshOverrideAndInvoker(List<URL> urls) {
    // mock zookeeper://xxx?mock=return null
    // 用變更的配置資訊覆蓋overrideDirectoryUrl成員變數
    overrideDirectoryUrl();
    // 重新整理快取中的Invokers
    refreshInvoker(urls);
}

overrideDirectoryUrl方法的作用主要是用從註冊中心以及配置中心監聽到的變更的配置覆蓋本地的overrideDirectoryUrl成員變數中的配置。我們接著往下走。

RegistryDirectory.refreshInvoker

// 入參invokerUrls是從註冊中心拉取的服務提供者url
private void refreshInvoker(List<URL> invokerUrls) {
    Assert.notNull(invokerUrls, "invokerUrls should not be null");

    // 如果只有一個服務提供者,並且協議名稱是empty,說明無提供者可用
    // 將狀態forbidden設為true, invokers設為空列表
    if (invokerUrls.size() == 1
            && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // Forbid to access
        this.invokers = Collections.emptyList();
        routerChain.setInvokers(this.invokers);
        destroyAllInvokers(); // Close all invokers
    } else {
        this.forbidden = false; // Allow to access
        // 記下舊的Invoker列表
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls == Collections.<URL>emptyList()) {
            invokerUrls = new ArrayList<>();
        }
        // 如果從註冊中心沒有拉取到服務提供者資訊,那麼使用之前快取的服務提供者資訊
        // 這就是為什麼dubbo在註冊中心掛了之後消費者仍然能夠呼叫提供者,因為消費者在本地進行了快取
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<>();
            this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
        }
        // 如果註冊中心沒有提供者資訊,並且本地也沒有快取,那麼就沒法進行服務呼叫了
        if (invokerUrls.isEmpty()) {
            return;
        }
        // 將服務提供者url轉化為Invoker物件存放到map中
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

        /**
         * If the calculation is wrong, it is not processed.
         *
         * 1. The protocol configured by the client is inconsistent with the protocol of the server.
         *    eg: consumer protocol = dubbo, provider only has other protocol services(rest).
         * 2. The registration center is not robust and pushes illegal specification data.
         *
         */
        if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                    .toString()));
            return;
        }

        List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
        // pre-route and build cache, notice that route cache should build on original Invoker list.
        // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
        // 將生成的Invoker列表設定到routerChain的快取中,
        // routerChain將對這些Invoker進行路由
        routerChain.setInvokers(newInvokers);
        // 處理服務分組的情況
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
        // 將快取的Invoker設定為新生成的
        this.urlInvokerMap = newUrlInvokerMap;

        try {
            // 這裡實際上求新的Invoker列表和舊的差集,將不再使用的舊的Invoker銷燬
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}
  • 這個方法首先根據監聽到的提供者url列表判斷是否處於服務禁用狀態,判斷依據是:如果只有一個url,並且該url協議名稱是empty,說明無提供者可用,將forbidden變數設為true,即禁止服務呼叫,
    並做一下其他的相關設定以及銷燬快取中的Invoker。

  • 如果不是禁止狀態,繼續往下走。如果從註冊中心獲取到的url列表為空,那麼檢查本地快取的url列表是否為空,如果快取不為空就用快取的列表。如果本地快取也為空,說明無服務可用,直接返回。
  • 如果如果從註冊中心獲取到的url列表不為空,說明有服務可用,這時就不會再去嘗試本地快取了(因為快取已經過期了),並且將本地快取更新為新獲取的url列表。
  • 將可用的提供者url列表轉化為Invoker列表。
  • 將新建立的Invoker列表設定到routerChain中,這裡呼應了前文提到的在doList方法中,從routerChain物件中取出快取的Invoker列表。
  • 將本地快取的url->Invoker map更新為新建立的。
  • 最後銷燬快取中不再使用的Invoker

RegistryDirectory.toInvokers

/**
 * Turn urls into invokers, and if url has been refer, will not re-reference.
 *
 * @param urls 從註冊中心拉取的服務提供者資訊
 * @return invokers
 */
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
    if (urls == null || urls.isEmpty()) {
        return newUrlInvokerMap;
    }
    // 用於防止對相同的url重複建立Invoker
    Set<String> keys = new HashSet<>();
    String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
    for (URL providerUrl : urls) {
        // If protocol is configured at the reference side, only the matching protocol is selected
        // 如果消費端配置了協議名稱,那麼只有符合條件的提供者url才會被使用
        // 這段程式碼有待商榷 ,應該先把queryProtocols處理好,避免重複做同樣的工作
        if (queryProtocols != null && queryProtocols.length() > 0) {
            boolean accept = false;
            String[] acceptProtocols = queryProtocols.split(",");
            for (String acceptProtocol : acceptProtocols) {
                if (providerUrl.getProtocol().equals(acceptProtocol)) {
                    accept = true;
                    break;
                }
            }
            if (!accept) {
                continue;
            }
        }
        // 如果協議名稱是empty,那麼忽略該條url
        if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
            continue;
        }
        // 如果當前classpath下找不到與提供者url中協議名稱相對應的Protocol類,那麼列印錯誤日誌同時忽略該條url
        if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
            logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                    " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                    " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                    ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
            continue;
        }
        // 合併消費端設定的引數以及從註冊中心,配置中心監聽到的配置變更
        URL url = mergeUrl(providerUrl);

        // 以全路徑作為該url的唯一標識
        String key = url.toFullString(); // The parameter urls are sorted
        if (keys.contains(key)) { // Repeated url
            continue;
        }
        keys.add(key);
        // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        // 如果之前已經建立過該url的Invoker物件,那麼就不用再重複建立
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        if (invoker == null) { // Not in the cache, refer again
            try {
                boolean enabled = true;
                // 檢查disabled和enabled引數的值
                if (url.hasParameter(Constants.DISABLED_KEY)) {
                    enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                } else {
                    enabled = url.getParameter(Constants.ENABLED_KEY, true);
                }
                if (enabled) {
                    // 真正建立Invoker的地方,
                    // InvokerDelegate只是個簡單的包裝類,不需要多說
                    invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
                logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
            }
            if (invoker != null) { // Put new invoker in cache
                newUrlInvokerMap.put(key, invoker);
            }
        } else {
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}
  • 首先根據協議名稱檢查url是否可用。url的協議必須在本地配置的協議列表中(如果沒有配置就不需要做此檢查);如果協議名稱是empty則忽略這個url;如果當前classpath下找不到與提供者url中協議名稱相對應的Protocol類,那麼列印錯誤日誌同時忽略該條url
  • 合併消費端設定的引數以及從註冊中心,配置中心監聽到的配置變更
  • 檢查disabled,enabled引數的值,判斷該url是否啟用,如果disabled為true則跳過該url;如果沒有disabled引數,檢查enabled引數,如果enabled為false則跳過該url,enabled預設是true。
  • 呼叫Protocol.refer方法建立Invoker物件。

這裡需要說明一下,由於Directory不是通過SPI機制載入的,所以RegistryDirectory也不是通過ExtensionLoader載入的,所以也就不會受到ExtensionLoader的IOC影響。RegistryDirectory內部的protocol成員是在RegistryDirectory初始化之後通過呼叫setter方法設定進去的,是在RegistryProtocol.doRefer方法中完成的。而RegistryProtocol是通過ExtensionLoader機制載入的,會受到IOC影響,所以RegistryProtocol例項內部的protocol成員是通過ExtensionLoader的IOC機制自動注入的,是一個自適應的擴充套件類。

另外,InvokerDelegate只是個簡單的包裝類,不需要多說。
Invoker的建立最終還是通過protocol.refer方法,我們以最常用的dubbo協議為例進行分析。

DubboProtocol.refer

@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}

這個方法很簡單,直接new了一個DubboInvoker。

DubboInvoker

看一下doInvoke方法,這個方法主要是處理了同步,非同步,超時,單向呼叫等引數,並且對呼叫結果封裝了非同步呼叫,同步呼叫的邏輯。
真正執行遠端呼叫的部分是靠ExchangeClient實現的,再往下就是呼叫引數的序列化,tcp連線建立,傳送報文,獲取響應報文,反序列化結果等的邏輯了,本文不再深入下去