NSQ原始碼分析(一)——nsqd的初始化及啟動流程
nsq原始碼地址:https://github.com/nsqio/nsq
版本1.1.0
NSQ原始碼分析系列是我通過閱讀nsq的原始碼及結合網上的相關文章整理而成,由於在網上沒有找到很詳細和完整的文章,故自己親自整理了一份。如果有錯誤的地方,還請指正,希望這系列的文章給您帶來幫助。
NSQD啟動流程
nsqd的啟動流程在nsq/apps/nsqd/nsqd.go中的Start()函式,以下為初始化流程
1.呼叫nsqd.NewOptions()載入引數及引數的預設值
2.將使用者配置的引數值設定到對應的引數
3.隨後判斷config引數是否存在,若存在的話還需進行配置檔案的讀取, 如果配置檔案存在並且符合toml格式,則呼叫cfg.Validate對配置檔案的各項進行進一步的合法性檢查
4.呼叫nsqd.New(opts)
5.呼叫LoadMetadata(),主要是載入元資料資訊
6.呼叫PersistMetadata(),主要將元資料持久化到檔案中
5.呼叫nsqd.Main(),啟動nsqd服務
ps:後面會詳細介紹各個啟動流程
func (p *program) Start() error { opts := nsqd.NewOptions() flagSet := nsqdFlagSet(opts) flagSet.Parse(os.Args[1:]) rand.Seed(time.Now().UTC().UnixNano()) if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) { fmt.Println(version.String("nsqd")) os.Exit(0) } var cfg config configFile := flagSet.Lookup("config").Value.String() if configFile != "" { _, err := toml.DecodeFile(configFile, &cfg) if err != nil { log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error()) } } cfg.Validate() options.Resolve(opts, flagSet, cfg) nsqd := nsqd.New(opts) err := nsqd.LoadMetadata() if err != nil { log.Fatalf("ERROR: %s", err.Error()) } err = nsqd.PersistMetadata() if err != nil { log.Fatalf("ERROR: failed to persist metadata - %s", err.Error()) } nsqd.Main() p.nsqd = nsqd return nil }
一、配置初始化及解析
opts := nsqd.NewOptions()
flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
二、NSQD初始化
NSQD的初始化主要在nsq/nsqd/nsqd.go檔案的New(opts *Options)函式中
主要作用是:初始化NSQD,並檢驗Options的配置資訊
三、元資料載入和持久化
元資料的載入和初始化主要在LoadMetadata()和PersistMetadata()中
下面主要看下LoadMetadata()方法
1.元資料以json格式儲存在nsqd可執行檔案目錄下的nsqd.dat檔案中。
2.讀取檔案中的json資料並轉換成反序列化成meta結構體,得到系統中存在的topic列表,遍歷列表中的topic:
(1)檢查topic名稱是否合法(長度在1-64之間,滿足正則表示式^[\.a-zA-Z0-9_-]+(#ephemeral)?$),若不合法則忽略
(2) 使用GetTopic()函式通過名字獲得topic物件
(3)判斷當前topic物件是否處於暫停狀態,是的話呼叫Pause函式暫停topic
(4) 獲取當前topic下所有的channel,並且遍歷channel,執行的操作與topic基本一致
(i)檢查channel名稱是否合法(長度在1-64之間,滿足正則表示式^[\.a-zA-Z0-9_-]+(#ephemeral)?$),若不合法則忽略
(ii)使用GetChannel函式通過名字獲得channel物件
(iii)判斷當前channel物件是否處於暫停狀態,是的話呼叫Pause函式暫停channel
至此,元資料的載入完成
func (n *NSQD) LoadMetadata() error {
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)
fn := newMetadataFile(n.getOpts())
data, err := readOrEmpty(fn)
if err != nil {
return err
}
if data == nil {
return nil // fresh start
}
var m meta
err = json.Unmarshal(data, &m)
if err != nil {
return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
}
for _, t := range m.Topics {
if !protocol.IsValidTopicName(t.Name) {
n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
continue
}
topic := n.GetTopic(t.Name)
if t.Paused {
topic.Pause()
}
for _, c := range t.Channels {
if !protocol.IsValidChannelName(c.Name) {
n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
continue
}
channel := topic.GetChannel(c.Name)
if c.Paused {
channel.Pause()
}
}
topic.Start()
}
return nil
}
PersistMetadata()
1.當前的topic和channel資訊寫入nsqd.dat檔案中,主要步驟是忽略#ephemeral結尾的topic和channel後將topic和channel列表json序列化後寫回檔案中
2.當專案啟動,退出,Topic和Channel變化,以及Topic和Channel暫停狀態改變都會呼叫方法持久化到該檔案中
四、NSQD服務啟動
主要在nsq/nsqd/nsqd.go中的Main()函式
1.開啟一個tcp服務,用於監聽tcp連線
(1)當有新的客戶端連線後,檢測協議版本號
(2)最後呼叫protocol_v2的IOLoop(conn net.Conn)進行客戶端讀寫操作 (以後還會詳細介紹IOLoop方法)
2.開啟一個協程,開啟一個http的Serve,用於監聽http請求
3.可選,如果配置tls資訊,則新建一個https的服務,用於監聽https請求
4.呼叫queueScanLoop()方法 作用:
(1)定期 執行根據channel的數量控制worker Pool中worker的數量
(2)定期 檢測channel中是否有可以投遞的訊息,如果有,則投遞訊息
5.呼叫lookupLoop()方法 作用:
(1)定時檢測nsqd和nsqlookupd的連線資訊(預設15s,執行一次PING命令來監聽)
(2)有Channel和Topic更新,則傳送給所有配置的nsqlookupd
(3)更新nsqlookupd配置
6.如果配置的StatsdAddress不為空,則呼叫statsdLoop,用於統計相關資訊
func (n *NSQD) Main() {
var err error
ctx := &context{n}
n.tcpListener, err = net.Listen("tcp", n.getOpts().TCPAddress)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err)
os.Exit(1)
}
n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
os.Exit(1)
}
//如果有tls配置,用於監聽https客戶端請求資料
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
n.httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
os.Exit(1)
}
}
tcpServer := &tcpServer{ctx: ctx}
//開啟一個協程,用於監聽客戶端連線,sync.WaitGroup.add(1),如果該協程結束,則Done()減1
n.waitGroup.Wrap(func() {
protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
})
//httpServer為http Serve的 Handler,定義了路由與對應得分方法
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
//開啟一個協程,開啟一個http的Serve,用於監聽http請求
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
})
//開啟一個協程 處理一個 http的Serve,用於監聽https請求
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)
})
}
n.waitGroup.Wrap(n.queueScanLoop)
n.waitGroup.Wrap(n.lookupLoop)
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
}
執行完Main
函式後,配置和初始化工作全部完成,各個元件啟動執行,而主goroutine會阻塞在<-signalChan
處,直到收到中斷程式的訊號,隨後執行nsqd.Exit()
函式。Exit()
函式將進行socket
關閉等清理工作,隨後結束整個程式的執行。