1. 程式人生 > >dubbo遠端呼叫原始碼分析(三):客戶端接收反饋後的處理

dubbo遠端呼叫原始碼分析(三):客戶端接收反饋後的處理

dubbo遠端呼叫的原始碼分析,分成了三篇文章地址分別如下:

下面是consumer接收到provider反饋時的處理

consumer接收到provider的反饋後,觸發NettyClient的事件處理器,該事件對consumer來說是上行事件,觸發的是NettyCodecAdapter.DeCoder和NettyHandler

首先是NettyCodecAdapter.DeCoder,呼叫的是NettyCodecAdapter中的InternalDecoder類的messageReceived()方法:

    private class InternalDecoder extends SimpleChannelUpstreamHandler {


        private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =

               com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;


        @Override

        public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception{

            Object o =event.getMessage();

            if (!(o instanceof ChannelBuffer)) {

               ctx.sendUpstream(event);

                return;

            }

 
           ChannelBuffer input = (ChannelBuffer) o;

            int readable = input.readableBytes();

            if(readable <= 0) {

                return;

            }

 
           com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;

            if(buffer.readable()) {

                if(buffer instanceof DynamicChannelBuffer) {

                   buffer.writeBytes(input.toByteBuffer());

                   message = buffer;

                } else{

                    int size = buffer.readableBytes() + input.readableBytes();

                   message =com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(

                           size > bufferSize ? size : bufferSize);

                   message.writeBytes(buffer, buffer.readableBytes());

                   message.writeBytes(input.toByteBuffer());

                }

            } else {

                message =com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(

                       input.toByteBuffer());

            }

 

           NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(),url, handler);

            Object msg;

            int saveReaderIndex;

 

            try {

                //decode object.

                do {

                   saveReaderIndex = message.readerIndex();

                    try{

                       msg = codec.decode(channel, message);

                    }catch (IOException e) {

                       buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

                       throw e;

                    }

                    if(msg == Codec2.DecodeResult.NEED_MORE_INPUT) {

                       message.readerIndex(saveReaderIndex);

                       break;

                    }else {

                       if (saveReaderIndex == message.readerIndex()) {

                           buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

                           throw new IOException("Decode without read data.");

                        }

                       if (msg != null) {

                           Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());

                       }

                    }

                } while(message.readable());

            } finally {

                if(message.readable()) {

                   message.discardReadBytes();

                   buffer = message;

                } else{

                   buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

                }

               NettyChannel.removeChannelIfDisconnected(ctx.getChannel());

            }

        }

 

        @Override

        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {

            ctx.sendUpstream(e);

        }

    }

和consumer給provider傳送訊息時呼叫的是一個方法,方法最後呼叫了Codec2.decode()方法,這個方法的實現在DubboCodec了的父類ExchangeCodec中:

    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {

        int readable =buffer.readableBytes();

        byte[] header =new byte[Math.min(readable, HEADER_LENGTH)];

       buffer.readBytes(header);

        return decode(channel, buffer, readable, header);

    }

然後decode()方法:

    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)throws IOException {

        // check magic number.

        if (readable> 0 && header[0] != MAGIC_HIGH

                ||readable > 1 && header[1] != MAGIC_LOW) {

            int length= header.length;

            if(header.length < readable) {

                header= Bytes.copyOf(header, readable);

               buffer.readBytes(header, length, readable - length);

            }

            for (int i= 1; i < header.length - 1; i++) {

                if(header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {

                   buffer.readerIndex(buffer.readerIndex() - header.length + i);

                   header = Bytes.copyOf(header, i);

                   break;

                }

            }

            return super.decode(channel, buffer, readable, header);

        }

        // check length.

        if (readable< HEADER_LENGTH) {

            return DecodeResult.NEED_MORE_INPUT;

        }

 

        // get data length.

        int len =Bytes.bytes2int(header, 12);

       checkPayload(channel, len);

 

        int tt = len +HEADER_LENGTH;

        if (readable< tt) {

            return DecodeResult.NEED_MORE_INPUT;

        }

 

        // limit inputstream.

        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

 

        try {

            return decodeBody(channel, is, header);

        } finally {

            if(is.available() > 0) {

                try {

                    if(logger.isWarnEnabled()) {

                       logger.warn("Skip input stream " + is.available());

                    }

                   StreamUtils.skipUnusedStream(is);

                } catch(IOException e) {

                   logger.warn(e.getMessage(), e);

                }

            }

        }

    }

最後return的decodeBody()方法在DubboCodec類中實現:

    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {

        byte flag =header[2], proto = (byte) (flag & SERIALIZATION_MASK);

        Serialization s= CodecSupport.getSerialization(channel.getUrl(), proto);

        // get requestid.

        long id =Bytes.bytes2long(header, 4);

        if ((flag &FLAG_REQUEST) == 0) {

            // decode response.

            Response res = new Response(id);

            if ((flag& FLAG_EVENT) != 0) {

               res.setEvent(Response.HEARTBEAT_EVENT);

            }

            // get status.

            byte status= header[3];

           res.setStatus(status);

            if (status== Response.OK) {

                try {

                   Object data;

                    if(res.isHeartbeat()) {

                       data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(),is));

                    } else if (res.isEvent()) {

                       data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));

                    }else {

                       DecodeableRpcResult result;

                       if (channel.getUrl().getParameter(

                               Constants.DECODE_IN_IO_THREAD_KEY,

                               Constants.DEFAULT_DECODE_IN_IO_THREAD)) {

                           result = new DecodeableRpcResult(channel, res, is,

                                   (Invocation) getRequestData(id), proto);

                           result.decode();

                       } else {

                           result = new DecodeableRpcResult(channel, res,

                                    new UnsafeByteArrayInputStream(readMessageData(is)),

                                   (Invocation) getRequestData(id), proto);

                       }

                       data = result;

                    }

                    res.setResult(data);

                } catch(Throwable t) {

                    if(log.isWarnEnabled()) {

                       log.warn("Decode response failed: " + t.getMessage(), t);

                    }

                   res.setStatus(Response.CLIENT_ERROR);

                   res.setErrorMessage(StringUtils.toString(t));

                }

            } else {

               res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());

            }

            return res;

        } else {

            // decode request.

            Request req= new Request(id);

           req.setVersion("2.0.0");

           req.setTwoWay((flag & FLAG_TWOWAY) != 0);

            if ((flag& FLAG_EVENT) != 0) {

               req.setEvent(Request.HEARTBEAT_EVENT);

            }

            try {

                Object data;

                if(req.isHeartbeat()) {

                   data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(),is));

                } else if (req.isEvent()) {

                   data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));

                } else{

                   DecodeableRpcInvocation inv;

                    if(channel.getUrl().getParameter(

                           Constants.DECODE_IN_IO_THREAD_KEY,

                           Constants.DEFAULT_DECODE_IN_IO_THREAD)) {

                       inv = new DecodeableRpcInvocation(channel, req, is, proto);

                       inv.decode();

                    }else {

                       inv = new DecodeableRpcInvocation(channel, req,

                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);

                    }

                   data = inv;

                }

               req.setData(data);

            } catch(Throwable t) {

                if(log.isWarnEnabled()) {

                   log.warn("Decode request failed: " + t.getMessage(), t);

                }

                // bad request

               req.setBroken(true);

               req.setData(t);

            }

            return req;

        }

    }

provider解析consumer發來的訊息時呼叫的是方法中decode request部分,現在consumer接收provider訊息,呼叫的是方法的decode response部分

首先建立了一個Response物件,從header中得到Response狀態,如果status不是OK,則在Response設定ErrorMessage,否則,建立一個DecodeableRpcResult物件,然後呼叫他的decode()方法

DecodeableRpcResult的decode()方法如下:

    public Object decode(Channel channel, InputStream input) throws IOException {

        ObjectInput in= CodecSupport.getSerialization(channel.getUrl(), serializationType)

               .deserialize(channel.getUrl(), input);

 

        byte flag =in.readByte();

        switch (flag) {

            case DubboCodec.RESPONSE_NULL_VALUE:

                break;

            case DubboCodec.RESPONSE_VALUE:

                try {

                   Type[] returnType = RpcUtils.getReturnTypes(invocation);

                   setValue(returnType == null || returnType.length == 0 ? in.readObject():

                           (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])

                                    :in.readObject((Class<?>) returnType[0], returnType[1])));

                } catch(ClassNotFoundException e) {

                   throw new IOException(StringUtils.toString("Read response datafailed.", e));

                }

                break;

            case DubboCodec.RESPONSE_WITH_EXCEPTION:

                try {

                   Object obj = in.readObject();

                    if(obj instanceof Throwable == false)

                       throw new IOException("Response data error, expect Throwable, butget " + obj);

                    setException((Throwable) obj);

                } catch(ClassNotFoundException e) {

                   throw new IOException(StringUtils.toString("Read response datafailed.", e));

                }

                break;

            default:

                throw new IOException("Unknown result flag, expect '0' '1' '2', get " +flag);

        }

        return this;

    }

首先是反序列化,然後判斷返回值型別

如果沒有返回值型別,也就是flag=DubboCodec.RESPONSE_NULL_VALUE,則直接break退出switch

如果返回值型別裡有異常,也就是flag=DubboCodec.RESPONSE_WITH_EXCEPTION,則呼叫父類的setException()方法,設定Exception屬性並退出switch

如果返回值有型別,也就是flag=DubboCodec.RESPONSE_VALUE,則呼叫父類的setValue()方法設定Value屬性然後退出switch

DecodeableRpcResult的父類是RpcResult類。

至此,事件的NettyCodecAdapter.DeCoder部分處理完畢,然後是NettyHandler的處理,呼叫的是NettyHandler的messageReceived()方法:

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);

        try {

           handler.received(channel, e.getMessage());

        } finally {

           NettyChannel.removeChannelIfDisconnected(ctx.getChannel());

        }

    }

然後是DecodeHandler類的received方法:

    public void received(Channel channel, Object message) throws RemotingException {

        if (message instanceof Decodeable) {

           decode(message);

        }

 

        if (message instanceof Request) {

           decode(((Request) message).getData());

        }

 

        if (message instanceof Response) {

            decode(((Response)message).getResult());

        }

 

       handler.received(channel, message);

    }

然後是HeaderExchangeHandler類的received()方法:

    public void received(Channel channel, Object message) throws RemotingException {

       channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());

        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);

        try {

            if (message instanceof Request) {

                //handle request.

                Request request = (Request) message;

                if(request.isEvent()) {

                   handlerEvent(channel, request);

                } else{

                    if(request.isTwoWay()) {

                       Response response = handleRequest(exchangeChannel, request);

                       channel.send(response);

                    }else {

                       handler.received(exchangeChannel, request.getData());

                    }

                }

            } else if (message instanceof Response) {

               handleResponse(channel, (Response) message);

            } else if(message instanceof String) {

                if(isClientSide(channel)) {

                   Exception e = new Exception("Dubbo client can not supported stringmessage: " + message + " in channel: " + channel + ", url:" + channel.getUrl());

                   logger.error(e.getMessage(), e);

                } else{

                   String echo = handler.telnet(channel, (String) message);

                    if(echo != null && echo.length() > 0) {

                       channel.send(echo);

                    }

                }

            } else {

               handler.received(exchangeChannel, message);

            }

        } finally {

           HeaderExchangeChannel.removeChannelIfDisconnected(channel);

        }

    }

這裡的message是一個Response物件,要呼叫handleResponse()方法:

    static void handleResponse(Channel channel, Response response) throws RemotingException {

        if (response !=null && !response.isHeartbeat()) {

           DefaultFuture.received(channel, response);

        }

    }

這個方法的if判斷裡要求這個response不能是心跳,目測dubbo介面之間的心跳檢測也是用這個方法的,這個方法呼叫了DefaultFuture類的received()方法:

    public static void received(Channel channel, Response response) {

        try {

           DefaultFuture future = FUTURES.remove(response.getId());

            if (future!= null) {

               future.doReceived(response);

            } else {

               logger.warn("The timeout response finally returned at "

                       + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(newDate()))

                       + ", response " + response

                       + (channel == null ? "" : ", channel: " +channel.getLocalAddress()

                       + " -> " + channel.getRemoteAddress()));

            }

        } finally {

           CHANNELS.remove(response.getId());

        }

    }

FUTURES是個<long,DefaultFuture>型別的map,首先從這個map中把這個Response刪除,然後呼叫DefaultFuture的doReceived()方法:

    private void doReceived(Response res) {

        lock.lock();

        try {

            response =res;

            if (done !=null) {

               done.signal();

            }

        } finally {

           lock.unlock();

        }

        if (callback !=null) {

           invokeCallback(callback);

        }

    }

這個方法中用的lock物件是java的重入鎖,是java.util.concurrent.locks.ReentrantLock.ReentrantLock這個類的物件,done物件是lock.newCondition()生成的,是java.util.concurrent.locks.Condition這個類的物件,該方法中呼叫的done.signal()方法的作用是喚醒consumer的呼叫執行緒,這個呼叫執行緒是consumer把訊息傳送給provider之後進入阻塞狀態的,讓執行緒進入阻塞狀態的方法是DefaultFuture類的get()方法:

    public Object get(int timeout) throws RemotingException {

        if (timeout<= 0) {

            timeout =Constants.DEFAULT_TIMEOUT;

        }

        if (!isDone()){

            long start= System.currentTimeMillis();

           lock.lock();

            try {

                while(!isDone()) {

                   done.await(timeout, TimeUnit.MILLISECONDS);

                    if(isDone() || System.currentTimeMillis() - start > timeout) {

                       break;

                    }

                }

            } catch(InterruptedException e) {

                throw new RuntimeException(e);

            } finally {

               lock.unlock();

            }

            if(!isDone()) {

                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));

            }

        }

        return returnFromResponse();

    }

其中done.await方法會使執行緒阻塞,但是不會一直阻塞,如果在設定的超時時間之後執行緒依然在阻塞狀態,則自動喚醒執行緒,後面的isDone()方法其實只判斷了response是否是null,如果執行緒因為超時而被喚醒,這個時候Response還是null,就會丟擲呼叫超時的異常,最後,呼叫returnFromResponse方法,返回Response中的result,程式碼如下:

    private Object returnFromResponse() throws RemotingException {

        Response res =response;

        if (res ==null) {

            throw new IllegalStateException("response cannot be null");

        }

        if(res.getStatus() == Response.OK) {

            return res.getResult();

        }

        if(res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT){

            throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel,res.getErrorMessage());

        }

        throw new RemotingException(channel, res.getErrorMessage());

    }

該返物件返物件,該返異常返異常,至此dubbo遠端呼叫的過程就結束了。