JAVA RPC (六) 之thrift反序列化RPC消息體
阿新 • • 發佈:2019-04-07
tee 直觀 iss 關鍵點 pre version 頁面 item eth
我們來看一下服務端的簡單實現,直接上thrift代碼,很直觀的來看一看thrift的server到底幹了些什麽
1 public boolean process(TProtocol in, TProtocol out) throws TException { 2 TMessage msg = in.readMessageBegin(); 3 ProcessFunction fn = (ProcessFunction)this.processMap.get(msg.name); 4 if (fn == null) { 5 TProtocolUtil.skip(in, (byte)12); 6 in.readMessageEnd(); 7 TApplicationException x = new TApplicationException(1, "Invalid method name: ‘" + msg.name + "‘"); 8 out.writeMessageBegin(new TMessage(msg.name, (byte)3, msg.seqid)); 9 x.write(out); 10 out.writeMessageEnd();11 out.getTransport().flush(); 12 return true; 13 } else { 14 fn.process(msg.seqid, in, out, this.iface); 15 return true; 16 } 17 }
TMessage msg = in.readMessageBegin();這段代碼的意思是從client端獲取到二進制數據時候,獲取Tmessage實例,和client的wirteMessage方法進行對應,聰明的小夥伴這個時候一定會想到這個是怎麽解析的
為什麽沒有解析消息體長度和消息體內容的部分代碼,不要著急我們一步一步滲透,readMessageBegin核心代碼如下
1 public TMessage readMessageBegin() throws TException { 2 int size = this.readI32(); 3 if (size < 0) { 4 int version = size & -65536; 5 if (version != -2147418112) { 6 throw new TProtocolException(4, "Bad version in readMessageBegin"); 7 } else { 8 return new TMessage(this.readString(), (byte)(size & 255), this.readI32()); 9 } 10 } else if (this.strictRead_) { 11 throw new TProtocolException(4, "Missing version in readMessageBegin, old client?"); 12 } else { 13 return new TMessage(this.readStringBody(size), this.readByte(), this.readI32()); 14 } 15 }
int size = this.readI32();是做了些什麽,接著往下看
1 public int readI32() throws TException { 2 byte[] buf = this.i32rd; 3 int off = 0; 4 if (this.trans_.getBytesRemainingInBuffer() >= 4) { 5 buf = this.trans_.getBuffer(); 6 off = this.trans_.getBufferPosition(); 7 this.trans_.consumeBuffer(4); 8 } else { 9 this.readAll(this.i32rd, 0, 4); 10 } 11 12 return (buf[off] & 255) << 24 | (buf[off + 1] & 255) << 16 | (buf[off + 2] & 255) << 8 | buf[off + 3] & 255; 13 }
這個地方代碼就很清晰了,原來是從trans裏面或取得字節流數據。
1 private int readAll(byte[] buf, int off, int len) throws TException { 2 this.checkReadLength(len); 3 return this.trans_.readAll(buf, off, len); 4 }
繼續能翻到最終的代碼解析部分
private void readFrame() throws TTransportException { this.transport_.readAll(this.i32buf, 0, 4); int size = decodeFrameSize(this.i32buf); if (size < 0) { throw new TTransportException("Read a negative frame size (" + size + ")!"); } else if (size > this.maxLength_) { throw new TTransportException("Frame size (" + size + ") larger than max length (" + this.maxLength_ + ")!"); } else { byte[] buff = new byte[size]; this.transport_.readAll(buff, 0, size); this.readBuffer_.reset(buff); } }
這裏就很明顯了,首先read一個四個字節的長度,這個int類型代表著後面要跟著接收的消息體的長度,來源是client的發送內容,然後在根據消息體長度在讀出來所有的內容,然後緩存到readBuffer_中,然後再讀取內容的時候直接從readBuffer_中獲取字節流就可以了。
好了,服務端接收二進制數據解析的關鍵點就在這裏了
https://gitee.com/a1234567891/koalas-rpc
koalas-RPC 個人作品,提供大家交流學習,有意見請私信,歡迎拍磚。客戶端采用thrift協議,服務端支持netty和thrift的TThreadedSelectorServer半同步半異步線程模型,支持動態擴容,服務上下線,權重動態,可用性配置,頁面流量統計等,持續為個人以及中小型公司提供可靠的RPC框架技術方案
更多學習內容請加高級java QQ群:825199617
JAVA RPC (六) 之thrift反序列化RPC消息體