1. 程式人生 > >dubbo原始碼分析-服務呼叫流程-筆記

dubbo原始碼分析-服務呼叫流程-筆記

消費端呼叫過程流程圖

消費端的呼叫過程

消費端介面例項:

服務端接收訊息處理過程

NettyHandler. messageReceived

  • 接收訊息的時候,通過NettyHandler.messageReceived作為入口
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
    try {
        handler.received(channel, e.getMessage());
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }
}

handler.received

  • 這個handler是什麼呢?還記得在服務釋出的時候,組裝了一系列的handler嗎?程式碼如下

HeaderExchanger.bind

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

NettyServer

  • 接著又在Nettyserver中,wrap了多個handler
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
            .getAdaptiveExtension().dispatch(handler, url)));
}

所以服務端的handler處理鏈為

  • MultiMessageHandler(HeartbeatHandler(AllChannelHandler(DecodeHandler)))
  • MultiMessageHandler: 複合訊息處理
  • HeartbeatHandler:心跳訊息處理,接收心跳併發送心跳響應
  • AllChannelHandler:業務執行緒轉化處理器,把接收到的訊息封裝成ChannelEventRunnable可執行任務給執行緒池處理
  • DecodeHandler:業務解碼處理器

HeaderExchangeHandler.received

  • 互動層請求響應處理,有三種處理方式:
    1. handlerRequest,雙向請求
    2. handler.received 單向請求
    3. handleResponse 響應訊息
public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    Response response = handleRequest(exchangeChannel, request);
                    channel.send(response);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

handleRequest

  • 處理請求並返回response
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    if (req.isBroken()) {
        Object data = req.getData();
        String msg;
        if (data == null) msg = null;
        else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
        else msg = data.toString();
        res.setErrorMessage("Fail to decode request due to: " + msg);
        res.setStatus(Response.BAD_REQUEST);

        return res;
    }
    // find handler by message class.
    Object msg = req.getData();
    try {
        // handle data.
        Object result = handler.reply(channel, msg);
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}

ExchangeHandlerAdaptive.replay(DubboProtocol)

  • 呼叫DubboProtocol中定義的ExchangeHandlerAdaptive.replay方法處理訊息
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    
    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
     invoker.invoke(inv);
}
  • 那接下來invoker.invoke會呼叫哪個類中的方法呢?
  • 還記得在RegistryDirectory中釋出本地方法的時候,對invoker做的包裝嗎?
  • 通過InvokerDelegete對原本的invoker做了一層包裝,而原本的invoker是什麼呢?
  • 是一個JavassistProxyFactory生成的動態代理吧。
  • 所以此處的invoker應該是:

Filter(Listener(InvokerDelegete(AbstractProxyInvoker (Wrapper.invokeMethod)))

  • RegistryDirectory生成invoker的程式碼如下:
private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return (ExporterChangeableWrapper<T>) exporter;
}