go語言實現自己的RPC:go rpc codec
RPC是遠端過程呼叫(Remote Procedure Call)的簡稱,通過RPC我們可以像呼叫本地方法一樣呼叫位於其他位置的函式。大家更常見的可能是HTTP API呼叫,簡單來對比的話,RPC比起HTTP呼叫封裝更完善,呼叫者不必手動處理序列化和反序列化,使用成本更低一些(雖然學習成本可能會更高)。
出於學習目的,這次的目標是使用go語言來實現一個自己的RPC。在現實世界裡,對於一個RPC工具,除了方法呼叫以外,人們更看重的是其他功能比如服務發現、負載均衡、熔斷降級之類的功能,這裡暫時不會涉及,而是僅關注實現一個可以工作的方法呼叫。
在之前的文章裡大致瞭解了go語言自帶的rpc框架,其中就提到go rpc預留了codec介面,可以讓使用者在go rpc使用自己的序列化協議,這次就嘗試實現一個自己的codec來實現自己的RPC。
準備工作
序列化協議
要實現一個RPC,基本的元素大概有這幾個:序列化協議、網路模型和執行緒模型。而go rpc裡的codec基本上實現的就是序列化協議。
本來想著用比較熟悉的thrift協議,但是使用thrift本身實現了RPC流程,所以它並不是一個單純的序列化協議,它的序列化邏輯可能無法和go rpc很好的契合,再加上還需要書寫IDL定義,增加複雜度。本來就是為了熟悉go,所以這裡先從簡單的開始,於是選擇messagepack作為序列化協議。
messagepack是一個比較輕量級的序列化協議,它的邏輯和json類似,但是使用的是二進位制形式,所以比json序列化更快,序列化後產生的資料也更小,基本上可以認為是一個二進位制版本的json。
建立類定義
要實現自己的codec,需要分別實現go rpc中提供個兩個介面:ServerCodec和ClientCodec,很明顯他們分別表示服務端和客戶端的邏輯,兩個介面的定義具體如下:
type ServerCodec interface { ReadRequestHeader(*Request) error ReadRequestBody(interface{}) error WriteResponse(*Response, interface{}) error Close() error } type ClientCodec interface { WriteRequest(*Request, interface{}) error ReadResponseHeader(*Response) error ReadResponseBody(interface{}) error Close() error } 複製程式碼
可以看到,go rpc將一次請求/響應抽象成了header+body的形式,讀取資料時分為讀取head和讀取body,寫入資料時只需寫入body部分,go rpc會替我們加上head部分。 接下來我們定義兩個結構,用來表示一次請求/響應的完整資料:
type MsgpackReq struct { rpc.Request//head Arg interface{} //body } type MsgpackResp struct { rpc.Response//head Reply interface{}//body } 複製程式碼
這裡的msgpackReq和msgpackResp直接內嵌了go rpc裡自帶的Request和Response,自帶的Request和Response定義了序號、方法名等資訊。
接下來就是自定義Codec的宣告:
type MessagePackServerCodec struct { rwcio.ReadWriteCloser //用於讀寫資料,實際是一個網路連線 reqMsgpackReq //用於快取解析到的請求 closed bool//標識codec是否關閉 } type MessagePackClientCodec struct { rwcio.ReadWriteCloser respMsgpackResp//用於快取解析到的請求 closed bool } func NewServerCodec(conn net.Conn) *MessagePackServerCodec { return &MessagePackServerCodec{conn, MsgpackReq{}, false} } func NewClientCodec(conn net.Conn) *MessagePackClientCodec { return &MessagePackClientCodec{conn, MsgpackResp{}, false} } 複製程式碼
在之前的文章裡提到了,codec需要包含一個數據源用於讀寫資料,這裡直接將網路連線傳遞進去。
實現Codec方法
實現思路
接下來是具體的方法實現,處於簡單起見,這裡將反序列化部分的兩步合併為一步,在讀取head部分時就將所有的資料解析好並快取起來,讀取body時直接返回快取的結果。具體的思路就是:
- 客戶端在傳送請求時,將資料包裝成一個MsgpackReq,然後用messagepack序列化併發送出去
- 服務端在讀取請求head部分時,將收到的資料用messagepack反序列化成一個MsgpackReq,並將得到的結果快取起來
- 服務端在讀取請求body部分時,從快取的MsgpackReq中獲取到Arg欄位並返回
- 服務端在傳送響應時,將資料包裝成一個MsgpackResp,然後用messagepack序列化併發送出去
- 客戶端在讀取響應head部分時,將收到的資料用messagepack反序列化成一個MsgpackResp,並將得到的結果快取起來
- 客戶端在讀取響應body部分時,從快取的MsgpackResp中獲取到Reply或者Error欄位並返回
Client實現
這裡直接上程式碼:
func (c *MessagePackClientCodec) WriteRequest(r *rpc.Request, arg interface{}) error { //先判斷codec是否已經關閉,如果是則直接返回 if c.closed { return nil } //將r和arg組裝成一個MsgpackReq並序列化 request := &MsgpackReq{*r, arg} reqData, err := msgpack.Marshal(request) if err != nil { panic(err) return err } //先發送資料長度 head := make([]byte, 4) binary.BigEndian.PutUint32(head, uint32(len(reqData))) _, err = c.rwc.Write(head) //再將序列化產生的資料傳送出去 _, err = c.rwc.Write(reqData) return err } func (c *MessagePackClientCodec) ReadResponseHeader(r *rpc.Response) error { //先判斷codec是否已經關閉,如果是則直接返回 if c.closed { return nil } //讀取資料 data, err := readData(c.rwc) if err != nil { //client一旦初始化就會開始輪詢資料,所以要處理連線close的情況 if strings.Contains(err.Error(), "use of closed network connection") { return nil } panic(err) //簡單起見,出現異常直接panic } //將讀取到的資料反序列化成一個MsgpackResp var response MsgpackResp err = msgpack.Unmarshal(data, &response) if err != nil { panic(err) //簡單起見,出現異常直接panic } //根據讀取到的資料設定request的各個屬性 r.ServiceMethod = response.ServiceMethod r.Seq = response.Seq //同時將讀取到的資料快取起來 c.resp = response return nil } func (c *MessagePackClientCodec) ReadResponseBody(reply interface{}) error { //這裡直接用快取的資料返回即可 if "" != c.resp.Error {//如果返回的是異常 return errors.New(c.resp.Error) } if reply != nil { //正常返回,通過反射將結果設定到reply變數,因為reply一定是指標型別,所以不必檢查CanSet reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(c.resp.Reply)) } return nil } func (c *MessagePackClientCodec) Close() error { c.closed = true //關閉時將closed設定為true if c.rwc != nil { return c.rwc.Close() } return nil } 複製程式碼
以上就是client部分的實現,值得注意的有幾點:
- 讀寫資料前,需要檢查codec是否已經關閉了
- 讀寫資料時需要處理拆包粘包(通過readData函式處理)
Server實現
同樣直接上程式碼:
func (c *MessagePackServerCodec) WriteResponse(r *rpc.Response, reply interface{}) error { //先判斷codec是否已經關閉,如果是則直接返回 if c.closed { return nil } //將r和reply組裝成一個MsgpackResp並序列化 response := &MsgpackResp{*r, reply} respData, err := msgpack.Marshal(response) if err != nil { panic(err) return err } head := make([]byte, 4) binary.BigEndian.PutUint32(head, uint32(len(respData))) _, err = c.rwc.Write(head) //將序列化產生的資料傳送出去 _, err = c.rwc.Write(respData) return err } func (c *MessagePackServerCodec) ReadRequestHeader(r *rpc.Request) error { //先判斷codec是否已經關閉,如果是則直接返回 if c.closed { return nil } //讀取資料 data, err := readData(c.rwc) if err != nil { //這裡不能直接panic,需要處理EOF和reset的情況 if err == io.EOF { return err } if strings.Contains(err.Error(), "connection reset by peer") { return err } panic(err) //其他異常直接panic } //將讀取到的資料反序列化成一個MsgpackReq var request MsgpackReq err = msgpack.Unmarshal(data, &request) if err != nil { panic(err) //簡單起見,出現異常直接panic } //根據讀取到的資料設定request的各個屬性 r.ServiceMethod = request.ServiceMethod r.Seq = request.Seq //同時將解析到的資料快取起來 c.req = request return nil } func (c *MessagePackServerCodec) ReadRequestBody(arg interface{}) error { if arg != nil { //引數不為nil,通過反射將結果設定到arg變數 reflect.ValueOf(arg).Elem().Set(reflect.ValueOf(c.req.Arg)) } return nil } func (c *MessagePackServerCodec) Close() error { c.closed = true if c.rwc != nil { return c.rwc.Close() } return nil } 複製程式碼
實際上server端的實現幾乎和client端邏輯的一樣,只是request和response的角色不同而已。其中有幾點需要注意:
- server端讀取資料時需要處理EOF和連線reset的情況
- server在返回資料時沒有顯式處理介面產生的error,只是將reply傳遞了回去,這是因為error在rpc.Request裡存著,不用codec處理
處理拆包粘包
具體思路參考go語言處理TCP拆包/粘包 ,這裡附上readData的實現:
func readData(conn io.ReadWriteCloser) (data []byte, returnError error) { const HeadSize = 4 //設定長度部分佔4個位元組 headBuf := bytes.NewBuffer(make([]byte, 0, HeadSize)) headData := make([]byte, HeadSize) for { readSize, err := conn.Read(headData) if err != nil { returnError = err return } headBuf.Write(headData[0:readSize]) if headBuf.Len() == HeadSize { break } else { headData = make([]byte, HeadSize-readSize) } } bodyLen := int(binary.BigEndian.Uint32(headBuf.Bytes())) bodyBuf := bytes.NewBuffer(make([]byte, 0, bodyLen)) bodyData := make([]byte, bodyLen) for { readSize, err := conn.Read(bodyData) if err != nil { returnError = err return } bodyBuf.Write(bodyData[0:readSize]) if bodyBuf.Len() == bodyLen { break } else { bodyData = make([]byte, bodyLen-readSize) } } data = bodyBuf.Bytes() returnError = nil return } 複製程式碼
測試程式碼
接下來我們通過簡單的Echo呼叫測試一下我們的codec:
//宣告介面類 type EchoService struct {} //定義方法Echo func (service *EchoService) Echo(arg string, result *string) error { *result = arg return nil } //服務端啟動邏輯 func RegisterAndServeOnTcp() { err := rpc.Register(&EchoService{})//註冊並不是註冊方法,而是註冊EchoService的一個例項 if err != nil { log.Fatal("error registering", err) return } tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234") if err != nil { log.Fatal("error resolving tcp", err) } listener, err := net.ListenTCP("tcp", tcpAddr) for { conn, err := listener.Accept() if err != nil { log.Fatal("error accepting", err) } else { //這裡先通過NewServerCodec獲得一個例項,然後呼叫rpc.ServeCodec來啟動服務 rpc.ServeCodec(msgpk.NewServerCodec(conn)) } } } //客戶端呼叫邏輯 func CallEcho(method string, arg interface{}) (result interface{}, err error) { var client *rpc.Client conn, err := net.Dial("tcp", ":1234") client = rpc.NewClientWithCodec(msgpk.NewClientCodec(conn)) defer func() { conn.Close() client.Close() }() if err != nil { return "", err } err = client.Call(method, arg, &result) //通過型別加方法名指定要呼叫的方法 if err != nil { return "", err } return result, err } //main函式 func main() { go server.RegisterAndServeOnTcp() //先啟動服務端 time.Sleep(1e9) wg := new(sync.WaitGroup) //waitGroup用於阻塞主執行緒防止提前退出 callTimes := 10 wg.Add(callTimes) for i := 0; i < callTimes; i++ { go func() { //使用hello world加一個隨機數作為引數 argString := "hello world "+strconv.Itoa(rand.Int()) resultString, err := client.Echo(argString) if err != nil { log.Fatal("error calling:", err) } if resultString != argString { fmt.Println("error") } else { fmt.Printf("echo:%s\n", resultString) } wg.Done() }() } wg.Wait() } 複製程式碼
上面的例子裡首先通過go server.RegisterAndServeOnTcp()啟動了服務端,然後同時啟動了10個go routine來發起請求,客戶端在收到響應之後會列印對應的結果。最後執行main函式,控制檯會輸出結果(後面的隨機數可能會不同):
echo:hello world 8674665223082153551 echo:hello world 6129484611666145821 echo:hello world 5577006791947779410 echo:hello world 605394647632969758 echo:hello world 4037200794235010051 echo:hello world 3916589616287113937 echo:hello world 894385949183117216 echo:hello world 1443635317331776148 echo:hello world 2775422040480279449 echo:hello world 6334824724549167320 複製程式碼
結語
到這裡,一個簡單的自定義的go語言rpc就已經完成了,雖然自定義部分只有序列化協議部分而已,比如執行緒模型仍是go rpc自帶的邏輯,除此之外也沒有前言裡提到的各種高階功能。後續再考慮嘗試用go語言從零開始實現一個RPC吧。