NSQ原始碼-nsqlookupd
為什麼選擇nsq
之前一直在用erlang做電信產品的開發,對erlang的一些生態也比較瞭解,和erlang相關的產品在網際網路公司使用最多的應該就是rabbitmq了,也許很多人聽說過erlang就是因為他們公司在使用rabbitmq。在之前也看過一點rabbitmq的程式碼,以及後來的emqtt都看過一點, 所以對訊息佇列這塊是情有獨鍾。轉到go後也在關注訊息佇列這塊,nsq是一個golng的訊息系統, 而且架構也非常的簡單。所以想通過原始碼的學習來掌握一些語言技巧。
nsq的架構與程式碼結構
nsq的的話主要有三個模組構成, 這裡直接複製官方的介紹:
nsqd: is the daemon that receives, queues, and delivers messages to clients.
nsqlookupd: is the daemon that manages topology information and provides an eventually consistent discovery service.
nsqadmin: is a web UI to introspect the cluster in realtime (and perform various administrative tasks).
這裡是一個訊息投遞的過程, 顯示了訊息怎麼從nsqd到達consumer, 缺少了producer和nsqlookupd. nsqlookupd主要提供了兩個功能:
- 向nsqd提供一個topic和channel的註冊資訊
- 對consumser提供了toic和channel的查詢功能然後
consumer查詢到nsqd之後就是上面看到的動態圖了, consumer直接和nsqd通訊, 下面是一個更全面一點的時序圖
整個專案的程式碼結構也是圍繞上面的三個模組構建:
- internal(公共部分的實現)
- nsqadmin(對nsqadmin的時間)
- nsqd(對nsqd的實現)
- nsqlookupd(對nsqlookupd的實現)
總共也就這四個package,是不是有很想看下去的衝動(smile).
lookupd的啟動流程
經過上面的介紹,我們對lookupd有裡簡單的認識.首先他是一個獨立的程序, 為topic和channel的發現服務. 但不參與時間的訊息投遞. 對lookup的實現是在nsq/apps/nsqlookupd/nsqlookupd.go和nsq/nsqlookupd/中. lookupd的啟動是使用了一個叫ofollow,noindex" target="_blank">go-srv 的windows wrapper.通過在nsq/apps/nsqlookupd/nsqlookupd.go中實現:
type Service interface { // Init is called before the program/service is started and after it's // determined if the program is running as a Windows Service. Init(Environment) error // Start is called after Init. This method must be non-blocking. Start() error // Stop is called in response to os.Interrupt, os.Kill, or when a // Windows Service is stopped. Stop() error }
來完成整個程序的管理,go-srv幫助我們做了系統訊號的管理, 下面來看下lookupd的啟動流程,
例項化一個NSQLookupd物件
// apps/nsqlookupd/nsqlookupd.go daemon := nsqlookupd.New(opts)// 例項化一個NSQLookupd的物件 err := daemon.Main()// 開始啟動NSQLookupd // nsq/nsqlookupd/nsqlookupd.go func New(opts *Options) *NSQLookupd { .... n := &NSQLookupd{ opts: opts,// 啟動引數 DB:NewRegistrationDB(), // 內從裡面的一個資料庫,主要用來儲存tpoic/channel以及nsqd的訊息 } ... return n }
開始啟動
// Main starts an instance of nsqlookupd and returns an // error if there was a problem starting up. func (l *NSQLookupd) Main() error { ctx := &Context{l} // 啟動兩場go routine來處理tcp/http的請求 tcpListener, err := net.Listen("tcp", l.opts.TCPAddress) if err != nil { return fmt.Errorf("listen (%s) failed - %s", l.opts.TCPAddress, err) } httpListener, err := net.Listen("tcp", l.opts.HTTPAddress) if err != nil { return fmt.Errorf("listen (%s) failed - %s", l.opts.TCPAddress, err) } l.tcpListener = tcpListener l.httpListener = httpListener tcpServer := &tcpServer{ctx: ctx} l.waitGroup.Wrap(func() { protocol.TCPServer(tcpListener, tcpServer, l.logf) }) httpServer := newHTTPServer(ctx) l.waitGroup.Wrap(func() { http_api.Serve(httpListener, httpServer, "HTTP", l.logf) }) return nil }
下面是一個lookupd裡面的程序模型
lookupd裡的主要資料結構
在上面建立一個instance的時候我們看到建立一個NewRegistrationDB()的函式, 這裡就是儲存lookupd所有資料結構的地方.
每個topic/channe/clientl就是一個Registration的key, 然後value對應的就是該topic/channel對應的nsqd資訊.所有的介面都是在操作上面的那個資料結構.
lookupd和其他模組的互動
在程序模型中我們看到一個tcp server和一個http seerver, 和其他模組之間的互動都是在裡面完成的.看下tcp server的處理
有新的tcp連線進來,建立一個新的go routine去服務該請求
// /nsq/internal/tcp_server.go func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) { for { ... go handler.Handle(clientConn) }
例項化一個protocol物件
// /nsq/nsqlookupd/tcp.go func (p *tcpServer) Handle(clientConn net.Conn) { ... prot.IOLoop(clientConn) ... }
對請求的具體處理
// /nsq/nsqlookupd/lookup_protocol_v1.go func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { ... p.Exec(client, reader, params) ... } // /nsq/nsqlookupd/lookup_protocol_v1.go func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { switch params[0] { case "PING": // NSQD的心跳包 return p.PING(client, params) case "IDENTIFY": // NQSD啟動時候的indentify就是我們上面看到的peerInfo return p.IDENTIFY(client, reader, params[1:]) case "REGISTER": // 註冊topic/channel資訊到lookupd return p.REGISTER(client, reader, params[1:]) case "UNREGISTER": // unregister topic/lookup 資訊 return p.UNREGISTER(client, reader, params[1:]) } return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) }
上面就是整個tcp server的流程, 每個連線都是一個go routine. 相對tcp server來說的話http server就簡單很多, 如果你對httprouter熟悉的話就更簡單了就是對RegistrationDB的增刪查改. http測的api的話可以參考:
官方的文件總結
lookupd是其中比較簡單的模組,通過原始碼的學習我們可以更好的掌握go的一些技巧,也鼓勵大家通過一一些開源的程式碼來掌握語言的一些技巧。其實通過lookupd我們可以抽象一套自己的HTTP/TCP服務端架構來。