1. 程式人生 > >golang thrift 原始碼分析,伺服器和客戶端究竟是如何工作的

golang thrift 原始碼分析,伺服器和客戶端究竟是如何工作的

首先編寫thrift檔案(rpcserver.thrift),執行thrift --gen go rpcserver.thrift,生成程式碼

namespace go rpc

service RpcService {
    string SayHi(1: string name);
    void SayHello(1: string name);
}

搭建一個以二進位制為傳輸協議的伺服器如下:

複製程式碼
type rpcService struct{
}

func (this *rpcService)SayHi(name string)(r string, err error){
    fmt.Println(
"Hi ", name) r = "Hello "+name err = nil return } func (this *rpcService)SayHello(name string)err error{ fmt.Println("Hello ", name) err = nil return } func StartServer(){ serverTransport, err := thrift.NewTServerSocket("127.0.0.1:8808") if err != nil { fmt.Println(
"Error!", err) return } handler := &rpcService{} processor := NewRpcServiceProcessor(handler) server := thrift.NewTSimpleServer2(processor, serverTransport) fmt.Println("thrift server in localhost") server.Serve() }
複製程式碼

檢視自動生成的程式碼recserver.go,我們發現 NewRpcServiceProcessor函式程式碼如下:

複製程式碼
func NewRpcServiceProcessor(handler RpcService) *RpcServiceProcessor {

    self4 := &RpcServiceProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)}
    self4.processorMap["SayHi"] = &rpcServiceProcessorSayHi{handler: handler}
    self4.processorMap["SayHello"] = &rpcServiceProcessorSayHello{handler: handler}
    return self4
}
複製程式碼

也就是說,thrift通過key-value儲存了我們實際將要執行的函式,最終通過handler來執行。

這裡就有點像我們使用golang系統中的http包中的ListenAndServer()函式時,提前通過Handfunc來設定好函式路由是一個意思。

再看看Serve()函式是如何實現的:

複製程式碼
func (p *TSimpleServer) Serve() error {
    err := p.Listen()
    if err != nil {
        return err
    }
    p.AcceptLoop()
    return nil
}

func (p *TSimpleServer) AcceptLoop() error {
    for {
        client, err := p.serverTransport.Accept()
if err != nil{
//.......被我刪掉
}
if client != nil { go func() { if err := p.processRequests(client); err != nil { log.Println("error processing request:", err) } }() } } }
複製程式碼

Serve()函式負責監聽連線到伺服器上的client,並且通過processRequests()函式來處理client。實際處理過程中,伺服器會獲取client的processor,然後進一步處理client的請求。這部分先暫停一下,我們來分析一下client端的工作原理,之後再回過頭來看看會比較清晰一些

首先我們架設client端如下,並且通過client端來發送一個SayHi的操作:   

複製程式碼
    transport, err := thrift.NewTSocket(net.JoinHostPort("127.0.0.1", "8808"))
if
err != nil { //... } protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() client := NewRpcServiceClientFactory(transport, protocolFactory) if err := transport.Open(); err != nil { //... } defer transport.Close()
res, _ := client.SayHi("wordl")
複製程式碼

現在問題來了,這個SayHi是如何通知給伺服器的呢?不急,看原始碼

在我們呼叫thrift --gen go XXX命令的時候,thrift已經給我們生成了SayHi過程的程式碼,如下:

func (p *RpcServiceClient) SayHi(name string) (r string, err error) {
    if err = p.sendSayHi(name); err != nil {
        return
    }
    return p.recvSayHi()
}

其中RpcServiceClient型別就是我們的client,可以看到先呼叫了一個sendSayHi,如果沒有錯誤的話,又呼叫了一個recvSayHi。

其實sendSayHi就是我們通知伺服器執行SayHi()函式的關鍵,而recvSayHi是接受伺服器的執行結果的。

一起看下sendSayHi是如何實現的(程式碼被我精簡,這保留了關鍵部分,完整程式碼可以自己通過thrift命令生成檢視)

複製程式碼
func (p *RpcServiceClient) sendSayHi(name string) (err error) {
    oprot := p.OutputProtocol  //獲取傳輸協議
    
    if err = oprot.WriteMessageBegin("SayHi", thrift.CALL, p.SeqId); err != nil { //傳送SayHi字元創,告訴伺服器將來執行的函式
        return
    }
    args := RpcServiceSayHiArgs{ //構建引數
        Name: name,
    }
    if err = args.Write(oprot); err != nil {  //將引數傳送給伺服器
        return
    }
    if err = oprot.WriteMessageEnd(); err != nil { //通知伺服器傳送完畢
        return
    }
    return oprot.Flush()
}
複製程式碼

通過這樣的一系列資料傳輸,伺服器通過路由解析,便可以正確的知道該執行哪個函數了。thrift的精髓也正在此,實現了rpc架構,客戶端只需要簡單的呼叫client.SayHi(),不必知道這是本地呼叫還是遠端呼叫。

好了,既然請求發出了,我們現在當然看看伺服器是如何響應的,在原始碼中,有一個函式是專門響應客戶端請求的:

func (p *RpcServiceProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException)

前面講解伺服器端是如何建立的時候講到過一個processRequests()函式,它在client連線上server的時候會被server呼叫。我們看看原始碼:

複製程式碼
func (p *TSimpleServer) processRequests(client TTransport) error {
    processor := p.processorFactory.GetProcessor(client)
    //....
    for {
        ok, err := processor.Process(inputProtocol, outputProtocol)
        if err{
                  //....
        }
    }
    return nil
}
複製程式碼

在去除無關程式碼之後我們看到,伺服器首先獲取客戶端的processor,然後呼叫processor的Process函式,從而執行響應客戶端的請求。

看看Process函式具體是如何實現的:

複製程式碼
func (p *RpcServiceProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
    name, _, seqId, err := iprot.ReadMessageBegin() //獲取客戶端請求執行的函式名稱
    if err != nil {
        return false, err
    }
    if processor, ok := p.GetProcessorFunction(name); ok {
        return processor.Process(seqId, iprot, oprot)  //執行
    }
    //...
}
複製程式碼

要注意的是,函式中用紅色標註的Process是另外一個函式,這裡可不是遞迴。兩個Process函式的宣告是不一樣的:

func (p *RpcServiceProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException)  //RpcServiceProcessor是server的processor

func (p *rpcServiceProcessorSayHi) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) //rpcServiceProcessorSayHi是具體的handler,

現在到了最關鍵的時候了,我們看看handler是如何執行process的:

複製程式碼
func (p *rpcServiceProcessorSayHi) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
    args := RpcServiceSayHiArgs{} //構建引數
    if err = args.Read(iprot); err != nil { //讀取客戶端發來的引數
        //處理err
    }

    iprot.ReadMessageEnd()  //讀取客戶端的結束訊息
    result := RpcServiceSayHiResult{} 
    var retval string
    var err2 error
    if retval, err2 = p.handler.SayHi(args.Name); err2 != nil { //執行函式
        //..處理err
    } else {
        result.Success = &retval
    }
    //...將result傳送給客戶端,流程和client傳送請求類似,client通過recvSayHi()函式接受result
    return true, err
}
複製程式碼

現在,伺服器和客戶端究竟是如何工作的。你明白了嗎