1. 程式人生 > >Dubbo原始碼解析之provider呼叫篇

Dubbo原始碼解析之provider呼叫篇

閱讀須知

  • dubbo版本:2.6.0
  • spring版本:4.3.8
  • 文章中使用/* */註釋的方法會做深入分析

正文

在之前的原始碼分析文章中,我們看到了dubbo用netty作為底層的網路通訊框架,熟悉netty的同學應該知道,使用netty時我們會使用它的各種Handler作為處理一些網路事件的處理器,在開啟netty服務時,dubbo添加了NettyHandler作為處理器,pipeline.addLast("handler", nettyHandler);,我們在Dubbo原始碼解析之provider初始化這篇文章中詳細的介紹了這個過程,同樣也詳細說明了handler的整個包裝過程,我們就以NettyHandler作為入口,來分析dubbo的服務呼叫的過程。當然在NettyHandler呼叫之前,請求首先會經過編碼器和解碼器進行編碼和解碼,編解碼的過程我們會用單獨的文章進行分析。NettyHandler繼承了netty的SimpleChannelHandler,通過覆蓋SimpleChannelHandler的相關方法來處理相關事件,provider在收到服務呼叫請求時會觸發messageReceived事件: NettyHandler:

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
	// 連線成功後新增netty的Channel和dubbo的NettyChannel之間的對映關係
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
    try {
        handler.received(channel, e.getMessage());
    } finally
{ // 如果連線斷開,移除netty的Channel和dubbo的NettyChannel之間的對映關係 NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } }

方法裡我們看到messageReceived事件委託給了內部維護的另外一個handler(ChannelHandler型別)物件,NettyServer和NettyClient在構造NettyHandler時都傳入了this作為handler,它們都實現了ChannelHandler,我們首先來看NettyServer的相關實現邏輯: AbstractPeer:

public void received(Channel ch, Object msg) throws RemotingException {
    if (closed) {
        return;
    }
    /* 服務呼叫請求處理 */
    handler.received(ch, msg);
}

這裡同樣把連線事件委託給內部維護的一個handler(ChannelHandler型別)物件來處理,這裡的handler是在構造NettyServer時傳入的,我們在Dubbo原始碼解析之provider初始化這篇文章中看到了handler的包裝過程,每次包裝都有不同的功能,我們來逐個分析: MultiMessageHandler:

public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof MultiMessage) {
        MultiMessage list = (MultiMessage) message;
        for (Object obj: list) {
            handler.received(channel, obj);
        }
    } else {
        handler.received(channel, message);
    }
}

MultiMessageHandler很好理解,就是為多條的這種訊息做一個遍歷。 HeartbeatHandler:

public void received(Channel channel, Object message) throws RemotingException {
    setReadTimestamp(channel);
    // 心跳請求訊息
    if (isHeartbeatRequest(message)) {
        Request req = (Request) message;
        if (req.isTwoWay()) {
            Response res = new Response(req.getId(), req.getVersion());
            res.setEvent(Response.HEARTBEAT_EVENT);
            // 傳送心跳影響事件訊息
            channel.send(res);
            if (logger.isInfoEnabled()) {
                int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                if (logger.isDebugEnabled()) {
                    logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() +
                        ", cause: The channel has no data-transmission exceeds a heartbeat period" +
                        (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                }
            }
        }
        return;
    }
    // 心跳響應訊息
    if (isHeartbeatResponse(message)) {
        if (logger.isDebugEnabled()) {
            logger.debug(
                new StringBuilder(32)
                .append("Receive heartbeat response in thread ")
                .append(Thread.currentThread().getName())
                .toString());
        }
        return;
    }
    handler.received(channel, message);
}

HeartbeatHandler也很簡單,就是對心跳訊息的處理。 AllChannelHandler:

public void received(Channel channel, Object message) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
	    /* 封裝任務提交到共享執行緒池執行 */
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
	    // 拒絕執行的請求處理
        if (message instanceof Request && t instanceof RejectedExecutionException) {
            Request request = (Request) message;
            if (request.isTwoWay()) {
                String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                Response response = new Response(request.getId(), request.getVersion());
                response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                response.setErrorMessage(msg);
                channel.send(response);
                return;
            }
        }
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

ChannelEventRunnable實現了Runnable,我們來看run方法的實現:

public void run() {
    switch (state) {
        case CONNECTED:
            try {
                handler.connected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
        case DISCONNECTED:
            try {
                handler.disconnected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
        case SENT:
            try {
                handler.sent(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel +
                    ", message is " + message, e);
            }
            break;
        case RECEIVED:
            try {
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel +
                    ", message is " + message, e);
            }
            break;
        case CAUGHT:
            try {
                handler.caught(channel, exception);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel +
                    ", message is: " + message + ", exception is " + exception, e);
            }
            break;
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
    }
}

很簡單,就是根據不同的事件型別做不同的處理,我們跟著received方法繼續分析。接下來請求訊息會經過DecodeHandler進行解碼,這部分內容我們會在Dubbo編解碼的相關文章中進行分析,接下來訊息會經過HeaderExchangeHandler

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
	        // 處理請求
            Request request = (Request) message;
            if (request.isEvent()) {
	            // 處理事件,判斷是否是隻讀請求並設定只讀標記
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
	                /* 處理請求,獲得響應 */
                    Response response = handleRequest(exchangeChannel, request);
                    /* 回覆響應 */
                    channel.send(response);
                } else {
	                /* 無需回覆的請求處理 */
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
	        /* 響應訊息處理 */
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
	            // dubbo客戶端不支援String型別的訊息
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
	            // 命令響應
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

HeaderExchangeHandler:

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    // 解碼失敗處理
    if (req.isBroken()) {
        Object data = req.getData();
        String msg;
        if (data == null) msg = null;
        else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
        else msg = data.toString();
        res.setErrorMessage("Fail to decode request due to: " + msg);
        res.setStatus(Response.BAD_REQUEST);
        return res;
    }
    Object msg = req.getData();
    try {
	    /* 請求處理應答 */
        Object result = handler.reply(channel, msg);
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}

這裡的handler,是由DubboProtocol維護的requestHandler物件,它是ExchangeHandlerAdapter的匿名內部類物件,我們來看相關實現: DubboProtocol.requestHandler:

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
    if (message instanceof Invocation) {
        Invocation inv = (Invocation) message;
        /* 匹配Invoker */
        Invoker<?> invoker = getInvoker(channel, inv);
        // 如果它是回撥,則需要考慮向後相容性
        if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
            String methodsStr = invoker.getUrl().getParameters().get("methods");
            boolean hasMethod = false;
            // 搜尋是否有相關方法的存在
            if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                hasMethod = inv.getMethodName().equals(methodsStr);
            } else {
                String[] methods = methodsStr.split(",");
                for (String method : methods) {
                    if (inv.getMethodName().equals(method)) {
                        hasMethod = true;
                        break;
                    }
                }
            }
            if (!hasMethod) {
	            // 沒有搜尋到相關方法直接返回null
                logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                return null;
            }
        }
        RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
        /* Invoker呼叫 */
        return invoker.invoke(inv);
    }
    throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}

DubboProtocol:

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
    boolean isCallBackServiceInvoke = false;
    boolean isStubServiceInvoke = false;
    int port = channel.getLocalAddress().getPort();
    String path = inv.getAttachments().get(Constants.PATH_KEY);
    isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
    if (isStubServiceInvoke) {
        port = channel.getRemoteAddress().getPort();
    }
    // 回撥
    isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
    if (isCallBackServiceInvoke) {
        path = inv.getAttachments().get(Constants.PATH_KEY) + "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
        inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
    }
    // 格式(分組/介面全稱:服務版本:埠),介面和埠是一定存在的,分組和服務版本不一定
    String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
    // 獲取Exporter
    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    if (exporter == null)
        throw new