【Zinx第五章-訊息封裝】Golang輕量級併發伺服器框架
接下來我們再對Zinx做一個簡單的升級,現在我們把伺服器的全部資料都放在一個Request裡,當前的Request結構如下:
type Request struct { conn ziface.IConnection //已經和客戶端建立好的連結 data []byte//客戶端請求的資料 }
很明顯,現在是用一個 []byte
來接受全部資料,又沒有長度,又沒有訊息型別,這不科學。怎麼辦呢?我們現在就要自定義一種訊息型別,把全部的訊息都放在這種訊息型別裡。
5.1 建立訊息封裝型別
在 zinx/ziface/
下建立 imessage.go
檔案
zinx/ziface/imessage.go
package ziface /* 將請求的一個訊息封裝到message中,定義抽象層介面 */ type IMessage interface { GetDataLen() uint32 //獲取訊息資料段長度 GetMsgId() uint32//獲取訊息ID GetData() []byte//獲取訊息內容 SetMsgId(uint32)//設計訊息ID SetData([]byte)//設計訊息內容 SetDataLen(uint32)//設定訊息資料段長度 }
同時建立例項message類,在 zinx/znet/
下,建立 message.go
檔案
zinx/znet/message.go
package znet type Message struct { Iduint32 //訊息的ID DataLen uint32 //訊息的長度 Data[]byte //訊息的內容 } //建立一個Message訊息包 func NewMsgPackage(id uint32, data []byte) *Message { return &Message{ Id:id, DataLen: uint32(len(data)), Data:data, } } //獲取訊息資料段長度 func (msg *Message) GetDataLen() uint32 { return msg.DataLen } //獲取訊息ID func (msg *Message) GetMsgId() uint32 { return msg.Id } //獲取訊息內容 func (msg *Message) GetData() []byte { return msg.Data } //設定訊息資料段長度 func (msg *Message) SetDataLen(len uint32) { msg.DataLen = len } //設計訊息ID func (msg *Message) SetMsgId(msgId uint32) { msg.Id = msgId } //設計訊息內容 func (msg *Message) SetData(data []byte) { msg.Data = data }
整理一個基本的message包,會包含 訊息ID , 資料 , 資料長度 三個成員,提供基本的setter和getter方法,目的是為了以後做封裝優化的作用。同時也提供了一個建立一個message包的初始化方法 NewMegPackage
。
5.2 訊息的封包與拆包
我們這裡就是採用經典的TLV(Type-Len-Value)封包格式來解決TCP粘包問題吧。

2-TCP粘包問題-拆包封包過程.jpeg
由於Zinx也是TCP流的形式傳播資料,難免會出現訊息1和訊息2一同傳送,那麼zinx就需要有能力區分兩個訊息的邊界,所以Zinx此時應該提供一個統一的拆包和封包的方法。在發包之前打包成如上圖這種格式的有head和body的兩部分的包,在收到資料的時候分兩次進行讀取,先讀取固定長度的head部分,得到後續Data的長度,再根據DataLen讀取之後的body。這樣就能夠解決粘包的問題了。
A) 建立拆包封包抽象類
在 zinx/ziface
下,建立 idatapack.go
檔案
zinx/ziface/idatapack.go
package ziface /* 封包資料和拆包資料 直接面向TCP連線中的資料流,為傳輸資料新增頭部資訊,用於處理TCP粘包問題。 */ type IDataPack interface{ GetHeadLen() uint32//獲取包頭長度方法 Pack(msg IMessage)([]byte, error)//封包方法 Unpack([]byte)(IMessage, error)//拆包方法 }
B) 實現拆包封包類
在 zinx/znet/
下,建立 datapack.go
檔案.
zinx/znet/datapack.go
package znet import ( "bytes" "encoding/binary" "errors" "zinx/utils" "zinx/ziface" ) //封包拆包類例項,暫時不需要成員 type DataPack struct {} //封包拆包例項初始化方法 func NewDataPack() *DataPack { return &DataPack{} } //獲取包頭長度方法 func(dp *DataPack) GetHeadLen() uint32 { //Id uint32(4位元組) +DataLen uint32(4位元組) return 8 } //封包方法(壓縮資料) func(dp *DataPack) Pack(msg ziface.IMessage)([]byte, error) { //建立一個存放bytes位元組的緩衝 dataBuff := bytes.NewBuffer([]byte{}) //寫msgID if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil { return nil, err } //寫dataLen if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil { return nil, err } //寫data資料 if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil { return nil ,err } return dataBuff.Bytes(), nil } //拆包方法(解壓資料) func(dp *DataPack) Unpack(binaryData []byte)(ziface.IMessage, error) { //建立一個從輸入二進位制資料的ioReader dataBuff := bytes.NewReader(binaryData) //只解壓head的資訊,得到dataLen和msgID msg := &Message{} //讀msgID if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil { return nil, err } //讀dataLen if err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil { return nil, err } //判斷dataLen的長度是否超出我們允許的最大包長度 if (utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize) { return nil, errors.New("Too large msg data recieved") } //這裡只需要把head的資料拆包出來就可以了,然後再通過head的長度,再從conn讀取一次資料 return msg, nil }
需要注意的是整理的 Unpack
方法,因為我們從上圖可以知道,我們進行拆包的時候是分兩次過程的,第二次是依賴第一次的dataLen結果,所以 Unpack
只能解壓出包頭head的內容,得到msgId 和 dataLen。之後呼叫者再根據dataLen繼續從io流中讀取body中的資料。
C) 測試拆包封包功能
為了容易理解,我們先不用整合zinx框架來測試,而是單獨寫一個Server和Client來測試一下封包拆包的功能
Server.go
package main import ( "fmt" "io" "net" "zinx/znet" ) //只是負責測試datapack拆包,封包功能 func main() { //建立socket TCP Server listener, err := net.Listen("tcp", "127.0.0.1:7777") if err != nil { fmt.Println("server listen err:", err) return } //建立伺服器gotoutine,負責從客戶端goroutine讀取粘包的資料,然後進行解析 for { conn, err := listener.Accept() if err != nil { fmt.Println("server accept err:", err) } //處理客戶端請求 go func(conn net.Conn) { //建立封包拆包物件dp dp := znet.NewDataPack() for { //1 先讀出流中的head部分 headData := make([]byte, dp.GetHeadLen()) _, err := io.ReadFull(conn, headData) //ReadFull 會把msg填充滿為止 if err != nil { fmt.Println("read head error") break } //將headData位元組流 拆包到msg中 msgHead, err := dp.Unpack(headData) if err != nil { fmt.Println("server unpack err:", err) return } if msgHead.GetDataLen() > 0 { //msg 是有data資料的,需要再次讀取data資料 msg := msgHead.(*znet.Message) msg.Data = make([]byte, msg.GetDataLen()) //根據dataLen從io中讀取位元組流 _, err := io.ReadFull(conn, msg.Data) if err != nil { fmt.Println("server unpack data err:", err) return } fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data)) } } }(conn) } }
Client.go
package main import ( "fmt" "net" "zinx/znet" ) func main() { //客戶端goroutine,負責模擬粘包的資料,然後進行傳送 conn, err := net.Dial("tcp", "127.0.0.1:7777") if err != nil { fmt.Println("client dial err:", err) return } //建立一個封包物件 dp dp := znet.NewDataPack() //封裝一個msg1包 msg1 := &znet.Message{ Id:0, DataLen: 5, Data:[]byte{'h', 'e', 'l', 'l', 'o'}, } sendData1, err := dp.Pack(msg1) if err != nil { fmt.Println("client pack msg1 err:", err) return } msg2 := &znet.Message{ Id:1, DataLen: 7, Data:[]byte{'w', 'o', 'r', 'l', 'd', '!', '!'}, } sendData2, err := dp.Pack(msg2) if err != nil { fmt.Println("client temp msg2 err:", err) return } //將sendData1,和 sendData2 拼接一起,組成粘包 sendData1 = append(sendData1, sendData2...) //向伺服器端寫資料 conn.Write(sendData1) //客戶端阻塞 select {} }
執行Server.go
go run Server.go
執行Client.go
go run Client.go
我們從服務端看到執行結果
$go run Server.go ==> Recv Msg: ID= 0 , len= 5 , data= hello ==> Recv Msg: ID= 1 , len= 7 , data= world!!
我們成功的得到了客戶端傳送的兩個包,並且成功的解析出來。
5.3 Zinx-V0.5程式碼實現
現在我們需要把封包和拆包的功能整合到Zinx中,並且測試Zinx該功能是否生效。
A) Request欄位修改
首先我們要將我們之前的Request中的 []byte
型別的data欄位改成Message型別.
zinx/znet/request.go
package znet import "zinx/ziface" type Request struct { conn ziface.IConnection //已經和客戶端建立好的 連結 msg ziface.IMessage//客戶端請求的資料 } //獲取請求連線資訊 func(r *Request) GetConnection() ziface.IConnection { return r.conn } //獲取請求訊息的資料 func(r *Request) GetData() []byte { return r.msg.GetData() } //獲取請求的訊息的ID func (r *Request) GetMsgID() uint32 { return r.msg.GetMsgId() }
B) 整合拆包過程
接下來我們需要在Connection的 StartReader()
方法中,修改之前的讀取客戶端的這段程式碼:
func (c *Connection) StartReader() { //... for{ //讀取我們最大的資料到buf中 buf := make([]byte, utils.GlobalObject.MaxPacketSize) _, err := c.Conn.Read(buf) if err != nil { fmt.Println("recv buf err ", err) c.ExitBuffChan <- true continue } //... } }
改成如下:
zinx/znet/connection.go
StartReader()方法
func (c *Connection) StartReader() { fmt.Println("Reader Goroutine isrunning") defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!") defer c.Stop() for{ // 建立拆包解包的物件 dp := NewDataPack() //讀取客戶端的Msg head headData := make([]byte, dp.GetHeadLen()) if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil { fmt.Println("read msg head error ", err) c.ExitBuffChan <- true continue } //拆包,得到msgid 和 datalen 放在msg中 msg , err := dp.Unpack(headData) if err != nil { fmt.Println("unpack error ", err) c.ExitBuffChan <- true continue } //根據 dataLen 讀取 data,放在msg.Data中 var data []byte if msg.GetDataLen() > 0 { data = make([]byte, msg.GetDataLen()) if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil { fmt.Println("read msg data error ", err) c.ExitBuffChan <- true continue } } msg.SetData(data) //得到當前客戶端請求的Request資料 req := Request{ conn:c, msg:msg, //將之前的buf 改成 msg } //從路由Routers 中找到註冊繫結Conn的對應Handle go func (request ziface.IRequest) { //執行註冊的路由方法 c.Router.PreHandle(request) c.Router.Handle(request) c.Router.PostHandle(request) }(&req) } }
C) 提供封包方法
現在我們已經將拆包的功能整合到Zinx中了,但是使用Zinx的時候,如果我們希望給使用者返回一個TLV格式的資料,總不能每次都經過這麼繁瑣的過程,所以我們應該給Zinx提供一個封包的介面,供Zinx發包使用。
zinx/ziface/iconnection.go
新增 SendMsg()
方法
package ziface import "net" //定義連線介面 type IConnection interface { //啟動連線,讓當前連線開始工作 Start() //停止連線,結束當前連線狀態M Stop() //從當前連接獲取原始的socket TCPConn GetTCPConnection() *net.TCPConn //獲取當前連線ID GetConnID() uint32 //獲取遠端客戶端地址資訊 RemoteAddr() net.Addr //直接將Message資料傳送資料給遠端的TCP客戶端 SendMsg(msgId uint32, data []byte) error }
zinx/znet/connection.go
SendMsg()
方法實現:
//直接將Message資料傳送資料給遠端的TCP客戶端 func (c *Connection) SendMsg(msgId uint32, data []byte) error { if c.isClosed == true { return errors.New("Connection closed when send msg") } //將data封包,並且傳送 dp := NewDataPack() msg, err := dp.Pack(NewMsgPackage(msgId, data)) if err != nil { fmt.Println("Pack error msg id = ", msgId) returnerrors.New("Pack error msg ") } //寫回客戶端 if _, err := c.Conn.Write(msg); err != nil { fmt.Println("Write msg id ", msgId, " error ") c.ExitBuffChan <- true return errors.New("conn Write error") } return nil }
5.4 使用Zinx-V0.5完成應用程式
現在我們可以基於Zinx框架完成傳送msg功能的測試用例了。
Server.go
package main import ( "fmt" "zinx/ziface" "zinx/znet" ) //ping test 自定義路由 type PingRouter struct { znet.BaseRouter } //Test Handle func (this *PingRouter) Handle(request ziface.IRequest) { fmt.Println("Call PingRouter Handle") //先讀取客戶端的資料,再回寫ping...ping...ping fmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData())) //回寫資料 err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping")) if err != nil { fmt.Println(err) } } func main() { //建立一個server控制代碼 s := znet.NewServer() //配置路由 s.AddRouter(&PingRouter{}) //開啟服務 s.Serve() }
當前Server端是先把客戶端傳送來Msg解析,然後返回一個MsgId為1的訊息,訊息內容是"ping...ping...ping"
Client.go
package main import ( "fmt" "io" "net" "time" "zinx/znet" ) /* 模擬客戶端 */ func main() { fmt.Println("Client Test ... start") //3秒之後發起測試請求,給服務端開啟服務的機會 time.Sleep(3 * time.Second) conn,err := net.Dial("tcp", "127.0.0.1:7777") if err != nil { fmt.Println("client start err, exit!") return } for { //發封包message訊息 dp := znet.NewDataPack() msg, _ := dp.Pack(znet.NewMsgPackage(0,[]byte("Zinx V0.5 Client Test Message"))) _, err := conn.Write(msg) if err !=nil { fmt.Println("write error err ", err) return } //先讀出流中的head部分 headData := make([]byte, dp.GetHeadLen()) _, err = io.ReadFull(conn, headData) //ReadFull 會把msg填充滿為止 if err != nil { fmt.Println("read head error") break } //將headData位元組流 拆包到msg中 msgHead, err := dp.Unpack(headData) if err != nil { fmt.Println("server unpack err:", err) return } if msgHead.GetDataLen() > 0 { //msg 是有data資料的,需要再次讀取data資料 msg := msgHead.(*znet.Message) msg.Data = make([]byte, msg.GetDataLen()) //根據dataLen從io中讀取位元組流 _, err := io.ReadFull(conn, msg.Data) if err != nil { fmt.Println("server unpack data err:", err) return } fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data)) } time.Sleep(1*time.Second) } }
這裡Client客戶端,模擬了一個MsgId為0的"Zinx V0.5 Client Test Message"訊息,然後把服務端返回的資料打印出來。
我們分別在兩個終端執行
$go run Server.go
$go run Client.go
服務端結果:
$ go run Server.go Add Router succ! [START] Server name: zinx v-0.5 demoApp,listenner at IP: 127.0.0.1, Port 7777 is starting [Zinx] Version: V0.4, MaxConn: 3, MaxPacketSize: 4096 start Zinx serverzinx v-0.5 demoAppsucc, now listenning... Reader Goroutine isrunning Call PingRouter Handle recv from client : msgId= 0 , data= Zinx V0.5 Client Test Message Call PingRouter Handle recv from client : msgId= 0 , data= Zinx V0.5 Client Test Message Call PingRouter Handle recv from client : msgId= 0 , data= Zinx V0.5 Client Test Message ...
客戶端結果:
$ go run Client.go Client Test ... start ==> Recv Msg: ID= 1 , len= 18 , data= ping...ping...ping ==> Recv Msg: ID= 1 , len= 18 , data= ping...ping...ping ==> Recv Msg: ID= 1 , len= 18 , data= ping...ping...ping ...
好了,我們的Zinx已經成功的整合訊息的封裝功能了,這樣我們就有Zinx的通訊的基本協議標準了。
關於作者:
作者: Aceld(劉丹冰)
簡書號: IT無崖子
mail: [email protected]
github: https://github.com/aceld
原創書籍gitbook: http://legacy.gitbook.com/@aceld