dubbo遠端呼叫原始碼分析(三):客戶端接收反饋後的處理
dubbo遠端呼叫的原始碼分析,分成了三篇文章地址分別如下:
下面是consumer接收到provider反饋時的處理
consumer接收到provider的反饋後,觸發NettyClient的事件處理器,該事件對consumer來說是上行事件,觸發的是NettyCodecAdapter.DeCoder和NettyHandler
首先是NettyCodecAdapter.DeCoder,呼叫的是NettyCodecAdapter中的InternalDecoder類的messageReceived()方法:
private class InternalDecoder extends SimpleChannelUpstreamHandler { private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception{ Object o =event.getMessage(); if (!(o instanceof ChannelBuffer)) { ctx.sendUpstream(event); return; } ChannelBuffer input = (ChannelBuffer) o; int readable = input.readableBytes(); if(readable <= 0) { return; } com.alibaba.dubbo.remoting.buffer.ChannelBuffer message; if(buffer.readable()) { if(buffer instanceof DynamicChannelBuffer) { buffer.writeBytes(input.toByteBuffer()); message = buffer; } else{ int size = buffer.readableBytes() + input.readableBytes(); message =com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer( size > bufferSize ? size : bufferSize); message.writeBytes(buffer, buffer.readableBytes()); message.writeBytes(input.toByteBuffer()); } } else { message =com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer( input.toByteBuffer()); } NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(),url, handler); Object msg; int saveReaderIndex; try { //decode object. do { saveReaderIndex = message.readerIndex(); try{ msg = codec.decode(channel, message); }catch (IOException e) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw e; } if(msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break; }else { if (saveReaderIndex == message.readerIndex()) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw new IOException("Decode without read data."); } if (msg != null) { Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress()); } } } while(message.readable()); } finally { if(message.readable()) { message.discardReadBytes(); buffer = message; } else{ buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; } NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { ctx.sendUpstream(e); } }
和consumer給provider傳送訊息時呼叫的是一個方法,方法最後呼叫了Codec2.decode()方法,這個方法的實現在DubboCodec了的父類ExchangeCodec中:
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int readable =buffer.readableBytes(); byte[] header =new byte[Math.min(readable, HEADER_LENGTH)]; buffer.readBytes(header); return decode(channel, buffer, readable, header); }
然後decode()方法:
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)throws IOException { // check magic number. if (readable> 0 && header[0] != MAGIC_HIGH ||readable > 1 && header[1] != MAGIC_LOW) { int length= header.length; if(header.length < readable) { header= Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i= 1; i < header.length - 1; i++) { if(header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break; } } return super.decode(channel, buffer, readable, header); } // check length. if (readable< HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } // get data length. int len =Bytes.bytes2int(header, 12); checkPayload(channel, len); int tt = len +HEADER_LENGTH; if (readable< tt) { return DecodeResult.NEED_MORE_INPUT; } // limit inputstream. ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { return decodeBody(channel, is, header); } finally { if(is.available() > 0) { try { if(logger.isWarnEnabled()) { logger.warn("Skip input stream " + is.available()); } StreamUtils.skipUnusedStream(is); } catch(IOException e) { logger.warn(e.getMessage(), e); } } } }
最後return的decodeBody()方法在DubboCodec類中實現:
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag =header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s= CodecSupport.getSerialization(channel.getUrl(), proto);
// get requestid.
long id =Bytes.bytes2long(header, 4);
if ((flag &FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
if ((flag& FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// get status.
byte status= header[3];
res.setStatus(status);
if (status== Response.OK) {
try {
Object data;
if(res.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(),is));
} else if (res.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
}else {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
result.decode();
} else {
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
res.setResult(data);
} catch(Throwable t) {
if(log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
} else {
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
} else {
// decode request.
Request req= new Request(id);
req.setVersion("2.0.0");
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag& FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
if(req.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(),is));
} else if (req.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else{
DecodeableRpcInvocation inv;
if(channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
}else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
} catch(Throwable t) {
if(log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
}
provider解析consumer發來的訊息時呼叫的是方法中decode request部分,現在consumer接收provider訊息,呼叫的是方法的decode response部分
首先建立了一個Response物件,從header中得到Response狀態,如果status不是OK,則在Response設定ErrorMessage,否則,建立一個DecodeableRpcResult物件,然後呼叫他的decode()方法
DecodeableRpcResult的decode()方法如下:
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in= CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
byte flag =in.readByte();
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE:
break;
case DubboCodec.RESPONSE_VALUE:
try {
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject():
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
:in.readObject((Class<?>) returnType[0], returnType[1])));
} catch(ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response datafailed.", e));
}
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION:
try {
Object obj = in.readObject();
if(obj instanceof Throwable == false)
throw new IOException("Response data error, expect Throwable, butget " + obj);
setException((Throwable) obj);
} catch(ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response datafailed.", e));
}
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2', get " +flag);
}
return this;
}
首先是反序列化,然後判斷返回值型別
如果沒有返回值型別,也就是flag=DubboCodec.RESPONSE_NULL_VALUE,則直接break退出switch
如果返回值型別裡有異常,也就是flag=DubboCodec.RESPONSE_WITH_EXCEPTION,則呼叫父類的setException()方法,設定Exception屬性並退出switch
如果返回值有型別,也就是flag=DubboCodec.RESPONSE_VALUE,則呼叫父類的setValue()方法設定Value屬性然後退出switch
DecodeableRpcResult的父類是RpcResult類。
至此,事件的NettyCodecAdapter.DeCoder部分處理完畢,然後是NettyHandler的處理,呼叫的是NettyHandler的messageReceived()方法:
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
然後是DecodeHandler類的received方法:
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response)message).getResult());
}
handler.received(channel, message);
}
然後是HeaderExchangeHandler類的received()方法:
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
//handle request.
Request request = (Request) message;
if(request.isEvent()) {
handlerEvent(channel, request);
} else{
if(request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
}else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if(message instanceof String) {
if(isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported stringmessage: " + message + " in channel: " + channel + ", url:" + channel.getUrl());
logger.error(e.getMessage(), e);
} else{
String echo = handler.telnet(channel, (String) message);
if(echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
這裡的message是一個Response物件,要呼叫handleResponse()方法:
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response !=null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
這個方法的if判斷裡要求這個response不能是心跳,目測dubbo介面之間的心跳檢測也是用這個方法的,這個方法呼叫了DefaultFuture類的received()方法:
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future!= null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(newDate()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " +channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
FUTURES是個<long,DefaultFuture>型別的map,首先從這個map中把這個Response刪除,然後呼叫DefaultFuture的doReceived()方法:
private void doReceived(Response res) {
lock.lock();
try {
response =res;
if (done !=null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback !=null) {
invokeCallback(callback);
}
}
這個方法中用的lock物件是java的重入鎖,是java.util.concurrent.locks.ReentrantLock.ReentrantLock這個類的物件,done物件是lock.newCondition()生成的,是java.util.concurrent.locks.Condition這個類的物件,該方法中呼叫的done.signal()方法的作用是喚醒consumer的呼叫執行緒,這個呼叫執行緒是consumer把訊息傳送給provider之後進入阻塞狀態的,讓執行緒進入阻塞狀態的方法是DefaultFuture類的get()方法:
public Object get(int timeout) throws RemotingException {
if (timeout<= 0) {
timeout =Constants.DEFAULT_TIMEOUT;
}
if (!isDone()){
long start= System.currentTimeMillis();
lock.lock();
try {
while(!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if(isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch(InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if(!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
其中done.await方法會使執行緒阻塞,但是不會一直阻塞,如果在設定的超時時間之後執行緒依然在阻塞狀態,則自動喚醒執行緒,後面的isDone()方法其實只判斷了response是否是null,如果執行緒因為超時而被喚醒,這個時候Response還是null,就會丟擲呼叫超時的異常,最後,呼叫returnFromResponse方法,返回Response中的result,程式碼如下:
private Object returnFromResponse() throws RemotingException {
Response res =response;
if (res ==null) {
throw new IllegalStateException("response cannot be null");
}
if(res.getStatus() == Response.OK) {
return res.getResult();
}
if(res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT){
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel,res.getErrorMessage());
}
throw new RemotingException(channel, res.getErrorMessage());
}
該返物件返物件,該返異常返異常,至此dubbo遠端呼叫的過程就結束了。