Golang 輕量級-高併發socket框架——chitchat
這是基於golang socket 一個輕量級,支援高併發操作的開發框架chitchat。本文將介紹chitchat的基本使用方法;通過原始碼分析該框架的具體工作流程;簡要講解作者留下的Demo檔案和該框架的使用技巧;下載連結。通過該框架,我們可以方便建立起Server-Client長連線並通訊。
使用chitchat
chitchat得以支援高併發連線的關鍵在於其能夠快速響應客戶端發起的連結並及時開啟goroutine確保一對一的通訊。對於使用者而言,只需負責向框架註冊正確的IP socket(ipAddr:ipPort)(注:除非特別說明,否則後續提到的地址Addr均指 addr:port
)並正確編寫用於處理 接收資料
和 異常處理
函式即可正常執行。
開啟一個Server
僅需建立一個Server例項並呼叫其Listen()方法即可使一個Server開始正常工作。一個Server通常只用於監聽一個埠,負責一類事物的排程處理。我們看一下具體排程的API:
func NewServer( ipaddrsocket string, delim byte, readfunc ServerReadFunc, additional interface{}) Server
可以看到,建立一個Server例項需要提供四個引數,分別為 監聽物件
, 分隔符
, 處理函式
, 附加資料
。其中 附加資料
可置為空 (nil) 。
監聽物件
即可供Client連線的IPsocket;當Server讀到一連串資料後,將通過delim 分隔符
將資料切片並交予readfunc處理,多片資料將呼叫多次readfunc。delim可置為0,此時Server將持續讀到EOF後才會交付資料。當delim置為‘\n’時,Server會預設換行交付,此時會根據Windows‘\r\n’作出對應調整; 處理函式
將處理Server交付的資料流; 附加資料
是為了配合readfunc更好的完成對資料的處理。後續在講解 如何編寫readfunc 時會提及如何使用additional給出的資料。
Server實則是一個可供呼叫的對外API介面 interface
,其中包含Listen()方法啟動該Server開始監聽。
func (t *server) Listen() error
Listen是一個非同步方法,如果發現配置引數有誤或埠被佔用等錯誤將會直接返回,否則就在後臺拉起新的goroutine處理具體事務。Listen()方法不阻塞程序,也不會等到後臺goroutine全部正常工作後再返回。
後臺goroutine在運轉處理的過程中若遇到錯誤將通過Err channel告知使用者,因此使用者需要顯式地接收並處理error。注意即使不需要這些error資訊,我們也需要有一個接收的過程,否則會導致後臺程序堵塞。通過 ErrChan()
獲取該Channel:
type Errsocket struct { Errerror RemoteAddr string } func (t *server) ErrChan() <-chan Errsocket
傳送的錯誤訊息包含兩部分, error
和 對端ip(addr:port)
。
當我們想關閉該Server,只需呼叫其 Cut
函式:
func (t *server) Cut() error
Cut()方法會使Server停止監聽Socket,同時釋放所有已連線的Connection。該方法和Listen()一樣,也 不會 等待所有Connection全部關閉後再返回。倘若希望關閉某特定的Connection(當然在我們已經知道該Connection對端連線IPaddr的前提下),我們可以使用 CloseRemote
方法:
func (t *server) CloseRemote(remoteAddr string) error
至此較為重要的Server API已經簡單介紹完成了,另外有些較為簡單的API根據名字便可知道其作用,不再簡單贅述。之後我們會通過一個簡單的例子演示這些API的用法。
type Server interface { Listen() error Cut() error CloseRemote(string) error RangeRemoteAddr() []string GetLocalAddr() string SetDeadLine(time.Duration, time.Duration) ErrChan() <-chan Errsocket Write(interface{}) error }
開啟一個Client
通過NewClient函式建立一個Client例項,並通過呼叫其API方法向服務端發起連線。
func NewClient( ipremotesocket string, delim byte, readfunc ClientReadFunc, additional interface{}) Client {
可以發現,建立Client例項的函式引數與建立Server例項 NewServer
的函參形式和意義基本相同。再次便不再多加解釋。注意的是,v1.0.0版本Client還未能指定自己的ipaddr,只能連線成功後隨機分配;另外,readfunc對於 Server
而言是不可置為空(nil)的,但對於 Client
而言可以置為nil,即忽略所有Server傳送的訊息。再有,對於一對Server/Client而言,其 分隔符delim
應該約定好是相同的,否則可能會出現訊息切分錯誤的情況。
Client通過呼叫 Dial()
方法向Server發起連線。
func (t *client) Dial() error
若連線錯誤,則會返回具體錯誤原因,否則拉起相應goroutine執行後續操作並返回。
關閉連線可使用API提供的Close()命令。
func (t *client) Close()
該函式的作用僅僅是向Client傳送了退出的訊號,若此時還有業務處於執行狀態(如readfunc)則會等待業務正常關閉後再退出。
以下是Client的全部對外API:
type Client interface { Dial() error Close() SetDeadLine(time.Duration) ErrChan() <-chan Errsocket Write(interface{}) error GetRemoteAddr() string GetLocalAddr() string }
最後我們講解 Write()
方法。從函式簽名中可以看到,無論什麼型別都可以傳遞給Write方法進行傳送。但事實上,目前庫自帶的Write()方法只能夠很好的處理以下幾種型別:string;[]byte;不包含任何pointer-value(包括string)的struct(也就是說,如果struct中某資料型別是string,那麼接收方讀到的資料將會出現偏差)。正如框架作者所言,當前的Write()方法不是一個最好的方法。因此,框架提供了一個函式使得使用者可以自定義Write()方法:
type wf func(net.Conn, interface{}, byte) error
func SetWriteFunc(f wf)
readfunc與APIs:
該框架最為重要的核心部分即readfunc的編寫,它的作用是處理由Server/Client遞交的資料片。我們先看一下readfunc的函式簽名:
type ClientReadFunc func([]byte, ReadFuncer) error
type ServerReadFunc func([]byte, ReadFuncer) error
無論是Client的readfunc或Server的readfunc,其函式簽名都是相同的。ReadFuncer是一個介面 interface
,它提供了一系列在readfunc函式中可用的API。稍後我們會對其中部分方法進行講解。
type ReadFuncer interface { GetRemoteAddr() string GetLocalAddr() string GetConn() net.Conn Close() Write(interface{}) error Addon() interface{} }
由於socket只允許傳遞 []byte
型別的資料,因此我們要做的第一步就是將[]byte型別轉變為我們希望的資料型別。如果寫入的是一個數據型別,我們想從[]byte轉為struct可使用:
var t = *(**yourStruct)(unsafe.Pointer(&str))
這裡將 yourStruct
替換為你自己的結構體別名即可。若是string型別,則簡單使用型別轉換即可。
readfunc提供了夠用的API,包括獲得本地/遠端IP socket與Conn,傳送資料,關閉連線,獲得 附加資料
。還記得 附加資料
嗎,這是我們在最初建立Server/Client例項時傳入的一個引數,現在可以通過 Addon()
將其取出來使用了。一般建議傳入的是一個指標型別的Addon,這樣readfunc可對其進行修改。
關於 Close()
函式:不用擔心在readfunc中使用Close()方法會提前終止readfunc業務,導致資料無法正常交付。正如前文所言,Close()只是向框架傳遞一個關閉的訊號。框架會等待readfunc全部執行完畢後再關閉這個連線。
原始碼分析
在分析Demo之前,我們先簡單探究一下約600多行的原始碼,看一下其內部各goroutine的支配執行情況。
Chitchat - goroutine
當 Server 呼叫Listen()方法時,Server內部會拉起一個 hL
goroutine(handleListen);當成功響應Client的Dial方法時, hL
拉起新的goroutine hC4s
(handleConnforServer); hC4s
通過拉起 read
讀取DATA並負責將DATA交付給 Readfunc
。一個 hC4s
對應一個連線,多個連線將開啟多個 hC4s
和 read
。 Client 向服務端發起連線成功後也將拉起一個goroutine hC4c
(handleConnforClient)和 read
。 eD 是一個較為特殊的goroutine,他負責使用者監聽的errChannel是否處於關閉狀態並將goroutine產生的錯誤資料傳遞給使用者。本文將不詳細分析eD,詳情可以參考這篇文章 多goroutine非同步通知error的一種方法 。
當 Server 呼叫Cut()方法關閉監聽後,它將關閉hL與所有的hC4s和read,以及負責錯誤轉發的eD,同時關閉errChannel;呼叫Close()/CloseRemote(...)方法時,僅關閉當前連線對應的hC4s和read,不關閉errChannel; Client 呼叫Close()關閉連線後,將關閉開啟的所有goroutine和errChannel。
hC4s和hC4c大同小異,我們著重分析一下hC4s原始碼:
func handleConnServer(h *hConnerServer, eC chan Errsocket, ctx context.Context, s *server) {...} type hConnerServer struct { connnet.Conn dbyte mu*sync.Mutex readfunc ServerReadFunc }
hConnerServer
結構體主要包含了以下內容:連線例項 conn ,分隔符 d ,普通鎖 mu , readfunc ,其中mutex主要用於維護eD的正常工作; eC
是上游傳遞下來的錯誤傳送通道;監聽ctx.Done()保證與上游一起收到退出訊號,但不保證退出的順序; server
提供readfunc使用的API。defer()語句保證了當hC4s退出時,將安全關閉conn和eD goroutine。
拉起的 read
goroutine將讀到的DATA分片通過channel傳送給hC4s,hC4s將該DATA交給readfunc處理:
//hC4s case strReq, ok := <-strReqChan: //read a data slice successfully if !ok { return //EOF && d!=0 } err := h.readfunc(strReq, &server{ currentConn: h.conn, delimiter:h.d, remoteMap:s.remoteMap, additional:s.additional, }) if err != nil { h.mu.Lock() eC <- Errsocket{err, h.conn.RemoteAddr().String()} } }
一個server struct同時實現了Server interface和ReadFuncer interface中的所有方法,並通過介面的方式將特定的方法暴露給框架的使用者,這樣設計使一些重複的方法在程式碼上得到複用。
hC4c在這段程式碼上稍有不同:
//hC4c case strReq, ok := <-strReqChan: //read a data slice successfully if !ok { return //EOF && d!=0 } if h.readfunc != nil { h.rcmu.Lock() err := h.readfunc(strReq, client) if err != nil { h.eD.mu.Lock() eC <- Errsocket{err, h.conn.RemoteAddr().String()} } h.rcmu.Unlock() } }
區別在於:
- 對於Client而言,其readfunc是可為nil的,這樣正常讀資料但不會被處理;
- 與Server相比,多了一個rcmu的鎖。該鎖是防止在readfunc中呼叫了Close()方法後error Channel被提前關閉,導致readfunc的錯誤資訊無法被正確送達。我們可以看一下Client的Close()方法:
func (t *client) Close() { go func() { t.rcmu.Lock() t.mu.Lock() t.closed = true close(t.eU) t.mu.Unlock() t.rcmu.Unlock() t.cancelfunc() }() }
可以看到,Close()方法會等待rcmu鎖被釋放後再執行後續操作。而為什麼Server不需要為之加鎖呢。因為Server在readfunc呼叫的Close()方法不會關閉上游的error Channel。
Server通過併發安全的map儲存每個Conn對應的ip socket與cancelFunc,保證能夠獨立關閉任意Conn。
Demo分析
與上文一樣,首先將各goroutine運作與排程的流程關係通過圖的形式表現出來,並簡要解釋各goroutine的作用:
demo - goroutine.jpg
Master提供的Listen()方法將註冊一個名為 registerNode 的 readfunc
;當 Node 節點向Master註冊成功後,Node節點拉起一個 dHBL (daemon-HeartBeatListener) goroutine,在7939埠發起監聽並註冊 hb4node readfunc
,用於接收ping報文併發送pong迴應;Master會拉起一個 dHBC (daemon-HeartBeatChecker),定時向Node端發起連線併發送ping報文,並註冊 hb4master readfunc
,當成功接收到pong報文後主動關閉連線。若在接收報文訊息過程中出現錯誤,將傳送錯誤訊息至 HBC/L error 錯誤處理器,供作進一步處理。
當 dHBC 連續接收到三次以上錯誤訊息後,判定對端Node失去連線;當 HBL error 十秒以上未收到Master發來的訊息後,判定Master已丟失自己。
Demo Tricks
在hb4master/node readfunc
中,無論結果成功與否,都會發送一個error("succeed"或 具體錯誤),這樣在HBC/L error便可根據error得知此次訊息傳遞的結果,並作進一步操作。
Github
Github: chitchat
或者也可以通過
go get github.com/ovenvan/chitchat
下載並使用。希望大家多pr並提issue,幫助這個框架更加完善。