1. 程式人生 > >【Dubbo源碼閱讀系列】服務暴露之遠程暴露

【Dubbo源碼閱讀系列】服務暴露之遠程暴露

3ds 不難 [] wrapper 重點 client 工廠 特定 講解

引言

什麽叫 遠程暴露 ?試著想象著這麽一種場景:假設我們新增了一臺服務器 A,專門用於發送短信提示給指定用戶。那麽問題來了,我們的 Message 服務上線之後,應該如何告知調用方服務器,服務器 A 提供了 Message 功能?那麽我們是不是可以把目前已提供的服務暴露在一個地方,讓調用方知道某臺機器提供了某個特定功能?帶著這樣的假設,我們今天就來聊聊 Dubbo 服務暴露之遠程暴露!!

服務遠程暴露

先回顧一下上篇文章,上篇文章我們聊到了 ServiceConfig 的 export() 方法,並且對服務的本地暴露內容進行了分析,今天我們接著這塊內容講講服務暴露之遠程暴露。

// export to remote if the config is not local (export to local only when config is local)
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    ...
    if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
        if (logger.isInfoEnabled()) {
            logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
        }
        if (registryURLs != null && !registryURLs.isEmpty()) {
            for (URL registryURL : registryURLs) {
                url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                // 為了幫助大家閱讀,省略部分代碼...
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
        } else {
            ...
        }
    }
    ...
}

這裏我們只關註核心代碼:

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);

invoker 對象的構建

先來看看 invoker 對象是怎麽創建的!這裏涉及到了 Dubbo SPI 機制,調用流程大致為
StubProxyFactoryWrapper.getInvoker() ==> JavassistProxyFactory.getInvoker()
詳細看下 JavassistProxyFactory 類的 getInvoker 方法

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                Class<?>[] parameterTypes,
                Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

值得我們重點註意的是 Wrapper 類的 getWrapper() 方法!!

public static Wrapper getWrapper(Class<?> c) {
    while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
    {
        c = c.getSuperclass();
    }

    if (c == Object.class) {
        return OBJECT_WRAPPER;
    }

    Wrapper ret = WRAPPER_MAP.get(c);
    if (ret == null) {
        ret = makeWrapper(c);
        WRAPPER_MAP.put(c, ret);
    }

    return ret;
}

這裏會使用參數 c 作為 key 值從 WRAPPER_MAP 緩存中取值,如果沒有對應的 value 值,會調用 makeWrapper() 方法借助 javassist 技術構建一個 Wrapper 包裝類。假設當前參數 c 的值為 demoService,那麽最後生成的動態類為:

public class Wrapper0 extends Wrapper implements DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    public Wrapper0() {
    }

    public Class getPropertyType(String var1) {
        return (Class)pts.get(var1);
    }

    public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
        DemoService var5;
        try {
            var5 = (DemoService)var1;
        } catch (Throwable var8) {
            throw new IllegalArgumentException(var8);
        }

        try {
            if("sayHello".equals(var2) && var3.length == 1) {
                return var5.sayHello((String)var4[0]);
            }
        } catch (Throwable var9) {
            throw new InvocationTargetException(var9);
        }

        throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class org.apache.dubbo.demo.DemoService.");
    }

    public String[] getPropertyNames() {
        return pns;
    }

    public Object getPropertyValue(Object var1, String var2) {
        try {
            DemoService var3 = (DemoService)var1;
        } catch (Throwable var5) {
            throw new IllegalArgumentException(var5);
        }

        throw new NoSuchPropertyException("Not found property \"" + var2 + "\" field or setter method in class org.apache.dubbo.demo.DemoService.");
    }

    public void setPropertyValue(Object var1, String var2, Object var3) {
        try {
            DemoService var4 = (DemoService)var1;
        } catch (Throwable var6) {
            throw new IllegalArgumentException(var6);
        }

        throw new NoSuchPropertyException("Not found property \"" + var2 + "\" field or setter method in class org.apache.dubbo.demo.DemoService.");
    }

    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    public boolean hasProperty(String var1) {
        return pts.containsKey(var1);
    }

    public String[] getMethodNames() {
        return mns;
    }
}

最後再回到 JavassistProxyFactory 類的 getInvoker 方法,可以看到它實際返回的是 AbstractProxyInvoker 對象,當調用 AbstractProxyInvoker 類的 doInvoke() 方法時,實際調用的是 wrapper 類的 invokeMethod() 方法!這個知識點十分重要!在我們講 Dubbo 遠程調用的時候會再次回顧這塊內容!

exporter 對象的構建

Exporter<?> exporter = protocol.export(wrapperInvoker);

再來看看後半句代碼。這裏最後會調用 RegistryProtocol 類的 export() 方法,若對此有疑問請看系列文章第一篇:【Dubbo源碼閱讀系列】之 Dubbo SPI 機制,後文不再贅述。 直接看看 RegistryProtocol 的 export() 方法:

RegistryProtocol.export()

public class RegistryProtocol implements Protocol {
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    
        URL registryUrl = getRegistryUrl(originInvoker);
    
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
    
        //to judge to delay publish whether or not
        boolean register = registeredProviderUrl.getParameter("register", true);
    
        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
    
        if (register) {
            register(registryUrl, registeredProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }
    
        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
    }
}

RegistryProtocol.export() 方法非常重要!!可以說是服務遠程暴露的核心了。廢話不多說,讓我們逐行來看看吧!

doLocalExport()

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
    // 獲取 providerUrl ,取 originInvoker url.parameters 鍵值對中 key 為 export 的值
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}

先來看看 doLocalExport() 方法做了什麽:

  1. 從 getCacheKey() 方法中獲取到的,鍵 export 對應的 value 在如下代碼中被添加到 url 的 parameters 集合中。然後我們在這裏取出對應的值。
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
  1. 嘗試從 bounds 緩存中取對應當前鍵 key 的 exporter。
  2. 如果緩存為 null,新建 exporter 並返回。這裏的 protocl 對象為 Protocol$Adaptive。不難分析最後執行的實際是 DubboProtocol 的 export() 方法。

總結一下:doLocalExport() 用 ExporterChangeableWrapper 代理類包裝了 protocol.export() 方法返回的 exporter 對象,最後放到了 bounds 集合中緩存。

DubboPrtocol.export()

DubboProtocol.java
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    openServer(url);
    optimizeSerialization(url);
    return exporter;
}

接著上文繼續看 DubboProtocol.export() 方法是如何創建 exporter 對象的:

  1. 調用 serviceKey() 方法構建服務的 key 值,最後的獲得的 key 值形式類似 group/path:version:port
  2. 新建 DubboExporter
  3. openServer(url),此時的 url 為 RegistryProtocol 傳遞過來的 providerUrl,openServer() 用途我們在後文分析;
  4. optimizeSerialization(url) 序列化操作,本文不做具體分析
    DubboProtocol.export() 返回的對象為 DubboExporter。值得我們註意是後面的 openServer() 方法!

    openServer()

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client can export a service which's only for server to invoke
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));
                }
            }
        } else {
            // server supports reset, use together with override
            server.reset(url);
        }
    }
}

openServer() 光從方法名看起來像是開啟服務連接的。方法比較簡單,取 url 的 address 作為 key,嘗試從 serverMap 獲取對應的 value 值。如果 value 值為 null 則調用 createServer(url) 方法創建 server 後添加到 serverMap 中。
createServer() 方法的流程比較冗長,我們這裏通過一張時序圖來給出該方法內部調用流程:

技術分享圖片

上圖省略了從 ServiceConfigRegistryProtocol 以及從 RegistryProtocolDubboProtocol 的轉換過程。這部分內容涉及到 Dubbo SPI 機制,如有疑問可以詳見:【Dubbo源碼閱讀系列】之 Dubbo SPI 機制。這裏給出簡單的轉換流程

  • ServiceConfig 到 RegistryProtocol
    Protocol$Adaptive ==》ProtocolFilterWrapper ==》ProtocolListenerWrapper ==》RegistryProtocol
  • RegistryProtocol 到 DubboProtocol
    Protocol$Adaptive ==》ProtocolFilterWrapper ==》ProtocolListenerWrapper ==》DubboProtocol

最後重點關註下 NettyServer 的 doOpen() 方法:

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    // https://issues.jboss.org/browse/NETTY-365
    // https://issues.jboss.org/browse/NETTY-379
    // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
    bootstrap.setOption("child.tcpNoDelay", true);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            /*int idleTimeout = getIdleTimeout();
            if (idleTimeout > 10000) {
                pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
            }*/
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
}

可以看到這段代碼是比較經典的 Netty 服務端啟動代碼...也就是說 openServer() 方法用於 Netty 服務端啟動。
我們知道 Netty 常用於客戶端和服務端之間的通訊。在這裏我們開啟了服務端,那麽在何處會開啟對應的客戶端呢?他們之間到底會進行什麽交互呢?這個疑問我們先留著待後續文章講解。

服務的暴露

上面講了這麽多,感覺還是和服務遠程暴露沒有沾多大的邊?到底我們的服務是如何被其它機器感知的?別人是怎麽知道我們某某臺機器提供了短信服務的?其實揭秘的序幕已經拉開了!讓我們繼續娓娓道來!
回顧一下 RegistryProtocol.export() 方法:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

    URL registryUrl = getRegistryUrl(originInvoker);

    //registry provider
    final Registry registry = getRegistry(originInvoker);
    final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

    //to judge to delay publish whether or not
    boolean register = registeredProviderUrl.getParameter("register", true);

    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

    if (register) {
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

上面我們已經聊完了 doLocalExport() 方法,繼續看 export() 方法的後半部分:

RegistryProtocol.java
final Registry registry = getRegistry(originInvoker);

private Registry getRegistry(final Invoker<?> originInvoker) {
    URL registryUrl = getRegistryUrl(originInvoker);
    return registryFactory.getRegistry(registryUrl);
}

這裏的 registryFactory 為 RegistryFactory$Adaptive(Dubbo 源碼中充斥了大量 SPI 擴展機制的使用,這裏不再贅述)。總之我們獲取到的擴展類為 ZookeeperRegistryFactory ,ZookeeperRegistryFactory 繼承自 AbstractRegistryFactory 類。因此最後調用的是 AbstractRegistryFactory 類的 getRegistry() 方法。

@Override
public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    String key = url.toServiceStringWithoutResolving();
    // Lock the registry access process to ensure a single instance of the registry
    LOCK.lock();
    try {
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // Release the lock
        LOCK.unlock();
    }
}

方法比較簡單,直接看重點方法 createRegistry(url)。createRegistry() 是一個抽象方法,會根據 url 來調用具體的實現方法,這裏我們用 ZookeeperRegistryFactory 類進行分析。

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
    ...
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }
    ...
}

public class ZookeeperRegistry extends FailbackRegistry {
    ...
    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }
    ...
}

ZookeeperRegistryFactory 類的 createRegistry() 方法會調用 ZookeeperRegistry 類的構造方法新建 ZookeeperRegistry 實例並返回。而 ZookeeperRegistry 類的構造方法會先調用父類 FailbackRegistry 的構造方法再執行後續操作。先看 FailbackRegistry 構造方法:

public abstract class FailbackRegistry extends AbstractRegistry {
    ...
    public FailbackRegistry(URL url) {
        super(url);
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                // Check and connect to the registry
                try {
                    // 延遲重試
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }
    ...
}

在 FailbackRegistry 構造方法中有一個延遲重試方法 retry(),如果發現失敗集合 failedRegistered、failedUnregistered、failedSubscribed、failedUnsubscribed、failedNotified 不為空,會進行重試操作。
繼續看 ZookeeperRegistry 類的構造方法:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    ...
    zkClient = zookeeperTransporter.connect(url);
    zkClient.addStateListener(new StateListener() {
        @Override
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

這裏的 ZookeeperTransporter.connect() 經過 SPI 轉換實際調用為 CuratorZookeeperTransporter.connect()。

public class CuratorZookeeperTransporter implements ZookeeperTransporter {
    @Override
    public ZookeeperClient connect(URL url) {
        return new CuratorZookeeperClient(url);
    }
}

public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {
    private final CuratorFramework client;

    public CuratorZookeeperClient(URL url) {
        super(url);
        try {
            int timeout = url.getParameter(Constants.TIMEOUT_KEY, 5000);
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(url.getBackupAddress())
                    .retryPolicy(new RetryNTimes(1, 1000))
                    .connectionTimeoutMs(timeout);
            String authority = url.getAuthority();
            if (authority != null && authority.length() > 0) {
                builder = builder.authorization("digest", authority.getBytes());
            }
            client = builder.build();
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState state) {
                    if (state == ConnectionState.LOST) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                    } else if (state == ConnectionState.CONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                    } else if (state == ConnectionState.RECONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                    }
                }
            });
            client.start();
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}

上面這段代碼使用 CuratorFrameworkFactory 工廠類創建了一個 CuratorFramework 實例,並啟動該實例創建了一個與 zookeeper 的連接。

再回到 RegistryProtocol 中的 getRegistry() 方法。我們發現它通過層層調用最終創建了一個到 ZookeeperRegistry 實例。這個實例中的 ziClient 對象建立了到 zookeeper 的連接。
我們知道 ZooKeeper 經常被用作註冊中心Ok。那我們現在已經連接上了 ZooKeeper 了,是不是該往 Zookeeper 上寫點啥了?繼續往下看,好戲要來啦!!~

register() 註冊方法

RegistryProtocol.java
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        ...
        if (register) {
            register(registryUrl, registeredProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }
        ...
    }
    public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }

在這裏 register() 方法最終會調用 FailbackRegistry 類的 register() 方法(不想再贅述為什麽!!!!)。

public abstract class FailbackRegistry extends AbstractRegistry {
    ...
    public void register(URL url) {
        super.register(url);
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // Sending a registration request to the server side
            doRegister(url);
        } catch (Exception e) {
            // ...
        }
    }
    ...
}

public class ZookeeperRegistry extends FailbackRegistry {
    protected void doRegister(URL url) {
        try {
            String str = toUrlPath(url);
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

劃重點啊筒子們!!! doRegister() 方法!!這裏調用鏈也比較長。畫個簡圖總結下:

技術分享圖片
小結:總之最後的目的是在 ZooKeeper 上創建通過 url 解析生成的 path 節點。大概長這個樣子:dubbo%3A%2F%2F10.137.32.54%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D4264%26side%3Dprovider%26timestamp%3D1546848704035
最後還有一個地方需要註意下:這裏調用 zkClient.create() 方法時,如果 dynamic 為空,默認會創建 zookeeper 臨時節點。臨時節點的好處在於如果客戶端和 zookeeper 集群斷開連接,對應的臨時節點則會自動被刪除。這樣一來,是不是對我們的調用方好處多多呢?

End

礙於篇幅限制,今天就先介紹這麽多。回顧一下,我們在 RegistryProtocol.export() 方法裏面創建了一個 DubboExporter 對象、開啟了 Netty 服務端,同時還往註冊中心 zookeeper 上創建了一個和服務有關的臨時節點!關於 RegistryProtocol.export() 方法剩余的內容,我們以後有機會再說吧!

本BLOG上原創文章未經本人許可,不得用於商業用途及傳統媒體。網絡媒體轉載請註明出處,否則屬於侵權行為。https://juejin.im/post/5c2dd31be51d451ffd25892d

【Dubbo源碼閱讀系列】服務暴露之遠程暴露