1. 程式人生 > >Dubbo/Dubbox的dubbo協議實現(一)-服務端啟動

Dubbo/Dubbox的dubbo協議實現(一)-服務端啟動

之前已經分析的dubbo的服務的發現和註冊,這裡先看一下dubbo協議是如何實現的,之前已經知道了,呼叫DubboProtocol類的export來暴露服務的,協議實現比較複雜,這裡只關係主體實現即排除一些特性功能的處理程式碼

本章主要處理服務端對應的暴露流程,繼續回到···com.alibaba.dubbo.config.ServiceConfig···的doExportUrlsFor1Protocol方法(487行附近)

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded
(Constants.EXPORT_KEY, url.toFullString())); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter);

proxyFactory 是一個全域性變數(自適應擴充套件點),之前已經反覆複習不多說明了
proxyFactory.getInvoker實現如下,該方法內部會呼叫一個封裝類com.alibaba.dubbo.common.bytecode.Wrapper,該類封裝了例項的方法呼叫。proxyFactory.getInvoker 會返回一個

public
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper類不能正確處理帶$的類名 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url
) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); }
}; }

這裡com.alibaba.dubbo.common.bytecode.Wrapper類不做過多說明,其會生成有一個名為invokeMethod封裝方法,invokeMethod 方法根據傳入例項和方法名,呼叫例項對應的方法,invokeMethod方法簽名如下:

  /**
     *
     * @param o 介面類例項
     * @param n 方法名
     * @param p 引數型別
     * @param v 引數值
     * @return
     * @throws java.lang.reflect.InvocationTargetException
     */
    public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException

所以到這裡可以發現,對於服務的提供者來說通過com.alibaba.dubbo.rpc.Invoker的 protected Object doInvoke(T proxy, String methodName,Class

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        /*url此刻內容如下。
        *dubbo://ip:port/服務介面?anyhost=true&application=dsp-ad&default.timeout=10000&dubbo=2.8.4&generic=false&
        *interface=服務介面&logger=log4j&methods=介面方法名&organization=company&owner=weiythi&pid=5948&revision=1.6.1&serialization=kryo&side=provider&timestamp=1511849953486
        */
        URL url = invoker.getUrl();

        // export service.
        // serviceKey為 服務介面類名:埠號
        String key = serviceKey(url);
        //DubboExporter 實現介面 com.alibaba.dubbo.rpc.Exporter ,會在抽象實現com.alibaba.dubbo.rpc.protocol.AbstractExporter 的基礎上維護exporterMap ,key
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
        //省略......export an stub service for dispaching event......//

        //開啟服務
        openServer(url);

        // modified by lishen 序列化優化相關,暫不去了解
        optimizeSerialization(url);

        return exporter;
    }

這裡先找到最關心的點“openServer(url);”,其是dubbo協議服務暴露的入口,其實現如下:

private void openServer(URL url) {
        // find server.
        // 這裡是ip:port
        String key = url.getAddress();
        //client 也可以暴露一個只有server可以呼叫的服務。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
        if (isServer) {//true
            //按照ip:port為key 找到資訊交換層的服務端
            ExchangeServer server = serverMap.get(key);
            if (server == null) {//如果木有建立過server端,則建立並將服務端快取
                serverMap.put(key, createServer(url));
            } else {
                //server支援reset,配合override功能使用
                server.reset(url); //這個暫時木有看懂,等回過頭來看
            }
        }
    }

注意這裡serverMap雖說不是一個static域,但基於擴充套件點機制的實現,貌似都是單例的,所以這裡一個服務端程序可以為多個provider提供服務,那麼createServer(url)是如何開啟服務的呢?
createServer(url)的方法實現如下, 注意該方法會通過url匯流排的server引數來決定使用dubbo的協議實現,dubbox這裡預設nio框架是netty

private ExchangeServer createServer(URL url) {
        //預設開啟server關閉時傳送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //預設開啟heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        //獲得協議的server實現,預設netty
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
            //注意這裡的requestHandler 是一個com.alibaba.dubbo.remoting.exchange.ExchangeHandler 的介面實現。
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

在方法中能注意到一個閃閃亮的語句,server = Exchangers.bind(url, requestHandler); 這裡傳入了一個引數requestHandler,requestHandler在此類是一個com.alibaba.dubbo.remoting.exchange.ExchangeHandler的介面實現,看了下原始碼這裡又引入了一系列暫時不知道用處的類,所以此處暫時忽略介面實現,既然其是一個handler所以接到consumer的呼叫之後必然會執行的,先放在一邊。我們接資料的時候一起看。
com.alibaba.dubbo.remoting.exchange.Exchangers 類主要有兩個功能,即伺服器的繫結和客戶端的連線功能
Exchangers.bind(url, requestHandler); 內部又涉及一系列的擴充套件點,不貼程式碼出來了。簡單介紹一下, 這裡肯定實現的功能就是伺服器的綁定了
1.bind方法會根據URL匯流排的exchanger引數獲得一個com.alibaba.dubbo.remoting.exchange.Exchanger 擴充套件點例項,本例例項為(com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger),嗯 目前已知的擴充套件點實現也就這一個。
2.Exchanger?在一個框架中我們通常把負責資料交換和網路通訊的元件叫做Exchanger ,接下來呼叫 ExchangeServer bind(URL url, ExchangeHandler handler) 方法,注意這裡又是一個自適應擴充套件點。

接下來進入到Exchanger僅有的一個實現com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

可見在該方法中,建立了一個com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeServer 物件,當然根據方法定義該類肯定實現介面com.alibaba.dubbo.remoting.exchange.ExchangeServer 這裡先暫時不關心該介面行為,因為建立物件前,先對handler進行了兩層封裝,然後使用com.alibaba.dubbo.remoting.Transporters的bind方法,進行繫結。
對handler的封裝可能會絕對handler呼叫前或呼叫後的行為,設定決定了handler的呼叫方式,那麼先去確定com.alibaba.dubbo.remoting.transport.DecodeHandler 的運作情況。
額,點開看了一下貌似也挺複雜,因為這個類一頓繼承啊~,找一下自己關心的兩個東東,分別是,我家的HeaderExchangeHandler在哪裡?有木有能讓我接受到訊息呼叫到的方法。
然後發現這兩個重點是這樣的

public DecodeHandler(ChannelHandler handler) {
        super(handler); //父類構造器
    }

    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }

        if (message instanceof Request) {
            decode(((Request)message).getData());
        }

        if (message instanceof Response) {
            decode( ((Response)message).getResult());
        }
        //呼叫received方法,既然我們的類已經被封裝了兩層,那麼這裡應該呼叫的是HeaderExchangeHandler
        handler.received(channel, message);
    }

    private void decode(Object message) { //這個方法沒有任何handler相關處理,暫不去關心。
        if (message != null && message instanceof Decodeable) {
            try {
                ((Decodeable)message).decode();
                if (log.isDebugEnabled()) {
                    log.debug(new StringBuilder(32).append("Decode decodeable message ")
                                  .append(message.getClass().getName()).toString());
                }
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn(
                        new StringBuilder(32)
                            .append("Call Decodeable.decode failed: ")
                            .append(e.getMessage()).toString(),
                        e);
                }
            } // ~ end of catch
        } // ~ end of if
    } // ~ end of method decode

那麼再去看看 HeaderExchangeHandler 是個啥
這裡寫圖片描述
還有既然會呼叫received方法,主要去看看received的實現。點開一看略複雜、、、、好多東東,看到這裡還不能理解,所以又去敲Transporters.bind的門,因為已經先行忽略了handler這個,這裡只關心服務是怎麼被open的
程式碼裡又耍流氓了,又是自適應擴充套件點,但是我已不怕,開啟看配置檔案直接鎖定實現com.alibaba.dubbo.remoting.transport.netty.NettyTransporter 然後去看bind方法(等等索性直接看了)

/*
 * Copyright 1999-2011 Alibaba Group.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.dubbo.remoting.transport.netty;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Client;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.Server;
import com.alibaba.dubbo.remoting.Transporter;

/**
 * @author ding.lid
 */
public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}

這個很明顯麼,直接new了一個com.alibaba.dubbo.remoting.transport.netty.NettyServer 出來,那麼問題簡單了,直接去看建構函式

public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

呼叫了父類的構造器

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        // 哭 handler又甩給上層了
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        String host = url.getParameter(Constants.ANYHOST_KEY, false)
                        || NetUtils.isInvalidLocalHost(getUrl().getHost())
                        ? NetUtils.ANYHOST : getUrl().getHost();
        bindAddress = new InetSocketAddress(host, getUrl().getPort());
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            //好吧,偷偷的呼叫了doOpen 害我好找
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }

        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
                .getDefaultExtension().get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }

主要關注兩個事,就是我們的handler哪去了,後面可能還要用呢,還有服務在哪開的?
首先看那第一行,呼叫父類的構造器,一直跟著找,發現handler在com.alibaba.dubbo.remoting.transport.AbstractPeer 中維護了,然後AbstractPeer又是一個ChannelHandler ,好吧這個關係很混亂。先記著這個關鍵點,上一次看見類似風格是在RegistryDictory中
既然呼叫了doOpen,這個就不用想了,方法的實現就在NettyServer裡啊,但是netty之前沒玩過,所以只能看個熱鬧

@Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);
        // 這裡的NettyHandler 封裝了一個this? 這個this是什麼?
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }

因為35太難了,我學不會,而且server開啟之後只要能一直提供服務就OK,我只能關注另一個點就是接到訊息後怎麼處理的。所以這裡跟訊息有關的就一個nettyHandler ,其初始化是通過這樣的語句完成的,這裡傳入了一個this,我們要知道這個this是什麼。

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);

@Sharable
public class NettyHandler extends SimpleChannelHandler {

    private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>

    private final URL url;

    private final ChannelHandler handler;
    //.......//
    public NettyHandler(URL url, ChannelHandler handler){
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            if (handler == null) {
                throw new IllegalArgumentException("handler == null");
            }
            this.url = url;
            this.handler = handler;
        }
}

好吧,很巧妙的方式,因為NettyServer他是一個ChannelHandler啊,是一個ChannelHandler啊…啊。
迷只混亂,只能拿出他家家譜,然後默默流眼淚
這個家譜裡貌似沒有我們熟悉的朋友。
這裡寫圖片描述
看看他的長輩有沒有人認識我們上文忽略的com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeServer,因為是這麼個關係new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); 所以去看看他的構造器

public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        startHeatbeatTimer();
    }

引數正好是NettyServer所實現的介面com.alibaba.dubbo.remoting.Server ,到這裡我們就返回一個ExchangeServer了,不過好像目前為止並沒有有什麼實質性的動作,回到createServer

好吧,到這一個服務就建立完了。總結一下開啟服務這一部的呼叫關係(引用大佬的)

openServer netty server開啟偵聽服務,並快取服務。
dubbo -> export() -> openServer() -> createServer() -> Exchangers.bind()(HeaderExchanger) -> NettyTransporter -> NettyServer.open()(編碼解碼採用exchange)
netty ChannelFactory(boss worker) ChannelPipeline(責任鏈模式) ChannelHandler(處理器) — 反應器模式