1. 程式人生 > >JAVA RPC (六) 之thrift反序列化RPC消息體

JAVA RPC (六) 之thrift反序列化RPC消息體

tee 直觀 iss 關鍵點 pre version 頁面 item eth

我們來看一下服務端的簡單實現,直接上thrift代碼,很直觀的來看一看thrift的server到底幹了些什麽

 1  public boolean process(TProtocol in, TProtocol out) throws TException {
 2         TMessage msg = in.readMessageBegin();
 3         ProcessFunction fn = (ProcessFunction)this.processMap.get(msg.name);
 4         if (fn == null) {
 5             TProtocolUtil.skip(in, (byte
)12); 6 in.readMessageEnd(); 7 TApplicationException x = new TApplicationException(1, "Invalid method name: ‘" + msg.name + "‘"); 8 out.writeMessageBegin(new TMessage(msg.name, (byte)3, msg.seqid)); 9 x.write(out); 10 out.writeMessageEnd();
11 out.getTransport().flush(); 12 return true; 13 } else { 14 fn.process(msg.seqid, in, out, this.iface); 15 return true; 16 } 17 }
TMessage msg = in.readMessageBegin();這段代碼的意思是從client端獲取到二進制數據時候,獲取Tmessage實例,和client的wirteMessage方法進行對應,聰明的小夥伴這個時候一定會想到這個是怎麽解析的
為什麽沒有解析消息體長度和消息體內容的部分代碼,不要著急我們一步一步滲透,
readMessageBegin核心代碼如下
 1  public TMessage readMessageBegin() throws TException {
 2         int size = this.readI32();
 3         if (size < 0) {
 4             int version = size & -65536;
 5             if (version != -2147418112) {
 6                 throw new TProtocolException(4, "Bad version in readMessageBegin");
 7             } else {
 8                 return new TMessage(this.readString(), (byte)(size & 255), this.readI32());
 9             }
10         } else if (this.strictRead_) {
11             throw new TProtocolException(4, "Missing version in readMessageBegin, old client?");
12         } else {
13             return new TMessage(this.readStringBody(size), this.readByte(), this.readI32());
14         }
15     }
int size = this.readI32();是做了些什麽,接著往下看
 1 public int readI32() throws TException {
 2         byte[] buf = this.i32rd;
 3         int off = 0;
 4         if (this.trans_.getBytesRemainingInBuffer() >= 4) {
 5             buf = this.trans_.getBuffer();
 6             off = this.trans_.getBufferPosition();
 7             this.trans_.consumeBuffer(4);
 8         } else {
 9             this.readAll(this.i32rd, 0, 4);
10         }
11 
12         return (buf[off] & 255) << 24 | (buf[off + 1] & 255) << 16 | (buf[off + 2] & 255) << 8 | buf[off + 3] & 255;
13     }

這個地方代碼就很清晰了,原來是從trans裏面或取得字節流數據。

1 private int readAll(byte[] buf, int off, int len) throws TException {
2         this.checkReadLength(len);
3         return this.trans_.readAll(buf, off, len);
4     }

繼續能翻到最終的代碼解析部分

private void readFrame() throws TTransportException {
        this.transport_.readAll(this.i32buf, 0, 4);
        int size = decodeFrameSize(this.i32buf);
        if (size < 0) {
            throw new TTransportException("Read a negative frame size (" + size + ")!");
        } else if (size > this.maxLength_) {
            throw new TTransportException("Frame size (" + size + ") larger than max length (" + this.maxLength_ + ")!");
        } else {
            byte[] buff = new byte[size];
            this.transport_.readAll(buff, 0, size);
            this.readBuffer_.reset(buff);
        }
    }

這裏就很明顯了,首先read一個四個字節的長度,這個int類型代表著後面要跟著接收的消息體的長度,來源是client的發送內容,然後在根據消息體長度在讀出來所有的內容,然後緩存到readBuffer_中,然後再讀取內容的時候直接從readBuffer_中獲取字節流就可以了。

好了,服務端接收二進制數據解析的關鍵點就在這裏了

https://gitee.com/a1234567891/koalas-rpc

koalas-RPC 個人作品,提供大家交流學習,有意見請私信,歡迎拍磚。客戶端采用thrift協議,服務端支持netty和thrift的TThreadedSelectorServer半同步半異步線程模型,支持動態擴容,服務上下線,權重動態,可用性配置,頁面流量統計等,持續為個人以及中小型公司提供可靠的RPC框架技術方案

更多學習內容請加高級java QQ群:825199617

 












JAVA RPC (六) 之thrift反序列化RPC消息體