1. 程式人生 > >Dubbo原始碼解析之consumer關聯provider

Dubbo原始碼解析之consumer關聯provider

閱讀須知

  • dubbo版本:2.6.0
  • spring版本:4.3.8
  • 文章中使用/* */註釋的方法會做深入分析

正文

我們知道,dubbo的生產者和消費者的關係維護在註冊中心,所以,消費者關聯生產者肯定是需要訂閱註冊中心的相關生產者資訊才能完成,在Dubbo原始碼解析之registry註冊中心這篇文章中我們分析了dubbo有關注冊中心的一些操作如註冊、訂閱等,在文章的最後,我們分析了消費者訂閱註冊中心的configuration、routers、providers等資訊的流程,在處理provider資訊的時候,會建立invoker,invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);

我們就接著invoker的建立繼續分析,來看consumer如何關聯provider。
DubboProtocol:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    /* 獲取client,構建rpc invoker */
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url,
getClients(url), invokers); invokers.add(invoker); return invoker; }

DubboProtocol:

private ExchangeClient[] getClients(URL url) {
    // 是否共享連線
    boolean service_share_connect = false;
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    if (connections == 0) {
        service_share_connect =
true; connections = 1; } // 如果未配置,則共享連線,否則一個服務一個連線 ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect) { /* 獲取共享的client */ clients[i] = getSharedClient(url); } else { /* 初始化client */ clients[i] = initClient(url); } } return clients; }

DubboProtocol:

private ExchangeClient getSharedClient(URL url) {
    String key = url.getAddress();
    ReferenceCountExchangeClient client = referenceClientMap.get(key);
    if (client != null) {
        if (!client.isClosed()) {
            // 快取不為空並且client未關閉,增加引用計數
            client.incrementAndGetCount();
            return client;
        } else {
            // 關閉的client從快取中移除
            referenceClientMap.remove(key);
        }
    }
    synchronized (key.intern()) {
        /* 初始化client */
        ExchangeClient exchangeClient = initClient(url);
        // 包裝client
        client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
        referenceClientMap.put(key, client);
        ghostClientMap.remove(key);
        return client;
    }
}

DubboProtocol:

private ExchangeClient initClient(URL url) {
    // client 型別,預設netty
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
    String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
    boolean compatible = (version != null && version.startsWith("1.0."));
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    // 預設啟動心跳檢測
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    // BIO是不允許的,因為它有嚴重的效能問題
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: " + str + "," +
                " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
    }
    ExchangeClient client;
    try {
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            /* 建立連線 */
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }
    return client;
}

Exchangers:

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    /* 建立連線 */
    return getExchanger(url).connect(url, handler);
}

HeaderExchanger:

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    /* 建立連線,構建HeaderExchangeClient */
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

Transporters:

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }
    /* 建立連線 */
    return getTransporter().connect(url, handler);
}

NettyTransporter:

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    /* 建立client */
    return new NettyClient(url, listener);
}

NettyClient:

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
    /* handler包裝過程我們在provider暴露過程中分析過 */
    super(url, wrapChannelHandler(url, handler));
}

AbstractClient:

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler); // 父類初始化
    send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
    reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
    try {
        doOpen(); /* 開啟服務 */
    } catch (Throwable t) {
        close();
        throw new RemotingException(url.toInetSocketAddress(), null,
                "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    }
    try {
        connect(); /* 建立連線 */
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
        }
    } catch (RemotingException t) {
        if (url.getParameter(Constants.CHECK_KEY, true)) {
            close(); // 異常關閉操作,如關閉channel等
            throw t;
        } else {
            logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                    + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
        }
    } catch (Throwable t) {
        close(); // 異常關閉操作,如關閉channel等
        throw new RemotingException(url.toInetSocketAddress(), null,
                "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    }
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
            .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
    ExtensionLoader.getExtensionLoader(DataStore.class)
            .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}

NettyClient:

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    bootstrap = new ClientBootstrap(channelFactory);
    bootstrap.setOption("keepAlive", true);
    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("connectTimeoutMillis", getTimeout());
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
}

開啟服務就是呼叫netty3的API來構建ClientBootstrap。
AbstractClient:

protected void connect() throws RemotingException {
    connectLock.lock();
    try {
        if (isConnected()) {
            return;
        }
        /* 初始化重連執行緒 */
        initConnectStatusCheckCommand();
        doConnect(); /* 連線操作 */
        if (!isConnected()) {
            throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                    + ", cause: Connect wait timeout: " + getTimeout() + "ms.");
        } else {
            if (logger.isInfoEnabled()) {
                logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                        + ", channel is " + this.getChannel());
            }
        }
        reconnect_count.set(0);
        reconnect_error_log_flag.set(false);
    } catch (RemotingException e) {
        throw e;
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                + ", cause: " + e.getMessage(), e);
    } finally {
        connectLock.unlock();
    }
}

AbstractClient:

private synchronized void initConnectStatusCheckCommand() {
    int reconnect = getReconnectParam(getUrl());
    if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
        // 重連任務
        Runnable connectStatusCheckCommand = new Runnable() {
            public void run() {
                try {
                    if (!isConnected()) {
                        connect(); // 建立連線
                    } else {
                        lastConnectedTime = System.currentTimeMillis();
                    }
                } catch (Throwable t) {
                    String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
                    // 等待註冊中心同步provider列表
                    if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
                        if (!reconnect_error_log_flag.get()) {
                            reconnect_error_log_flag.set(true);
                            logger.error(errorMsg, t);
                            return;
                        }
                    }
                    if (reconnect_count.getAndIncrement() % reconnect_warning_period <