1. 程式人生 > >NSQ原始碼分析(一)——nsqd的初始化及啟動流程

NSQ原始碼分析(一)——nsqd的初始化及啟動流程

nsq原始碼地址:https://github.com/nsqio/nsq

版本1.1.0 

NSQ原始碼分析系列是我通過閱讀nsq的原始碼及結合網上的相關文章整理而成,由於在網上沒有找到很詳細和完整的文章,故自己親自整理了一份。如果有錯誤的地方,還請指正,希望這系列的文章給您帶來幫助。

 

NSQD啟動流程

   nsqd的啟動流程在nsq/apps/nsqd/nsqd.go中的Start()函式,以下為初始化流程

1.呼叫nsqd.NewOptions()載入引數及引數的預設值
2.將使用者配置的引數值設定到對應的引數
3.隨後判斷config引數是否存在,若存在的話還需進行配置檔案的讀取, 如果配置檔案存在並且符合toml格式,則呼叫cfg.Validate對配置檔案的各項進行進一步的合法性檢查
4.呼叫nsqd.New(opts)

,初始化NSQD結構體,並檢驗Options配置資訊是否有誤
5.呼叫LoadMetadata(),主要是載入元資料資訊
6.呼叫PersistMetadata(),主要將元資料持久化到檔案中
5.呼叫nsqd.Main(),啟動nsqd服務

ps:後面會詳細介紹各個啟動流程

func (p *program) Start() error {
	opts := nsqd.NewOptions()

	flagSet := nsqdFlagSet(opts)
	flagSet.Parse(os.Args[1:])

	rand.Seed(time.Now().UTC().UnixNano())

	if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
		fmt.Println(version.String("nsqd"))
		os.Exit(0)
	}

	var cfg config
	configFile := flagSet.Lookup("config").Value.String()
	if configFile != "" {
		_, err := toml.DecodeFile(configFile, &cfg)
		if err != nil {
			log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
		}
	}
	cfg.Validate()

	options.Resolve(opts, flagSet, cfg)
	nsqd := nsqd.New(opts)

	err := nsqd.LoadMetadata()
	if err != nil {
		log.Fatalf("ERROR: %s", err.Error())
	}
	err = nsqd.PersistMetadata()
	if err != nil {
		log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
	}
	nsqd.Main()

	p.nsqd = nsqd
	return nil
}

一、配置初始化及解析

opts := nsqd.NewOptions()

flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])

二、NSQD初始化

NSQD的初始化主要在nsq/nsqd/nsqd.go檔案的New(opts *Options)函式中

主要作用是:初始化NSQD,並檢驗Options的配置資訊

 

三、元資料載入和持久化

元資料的載入和初始化主要在LoadMetadata()和PersistMetadata()中

下面主要看下LoadMetadata()方法

1.元資料以json格式儲存在nsqd可執行檔案目錄下的nsqd.dat檔案中。
2.讀取檔案中的json資料並轉換成反序列化成meta結構體,得到系統中存在的topic列表,遍歷列表中的topic:
  (1)檢查topic名稱是否合法(長度在1-64之間,滿足正則表示式^[\.a-zA-Z0-9_-]+(#ephemeral)?$),若不合法則忽略
  (2) 使用GetTopic()函式通過名字獲得topic物件
  (3)判斷當前topic物件是否處於暫停狀態,是的話呼叫Pause函式暫停topic
  (4) 獲取當前topic下所有的channel,並且遍歷channel,執行的操作與topic基本一致
        (i)檢查channel名稱是否合法(長度在1-64之間,滿足正則表示式^[\.a-zA-Z0-9_-]+(#ephemeral)?$),若不合法則忽略
        (ii)使用GetChannel函式通過名字獲得channel物件
        (iii)判斷當前channel物件是否處於暫停狀態,是的話呼叫Pause函式暫停channel
至此,元資料的載入完成

func (n *NSQD) LoadMetadata() error {
	atomic.StoreInt32(&n.isLoading, 1)
	defer atomic.StoreInt32(&n.isLoading, 0)

	fn := newMetadataFile(n.getOpts())

	data, err := readOrEmpty(fn)
	if err != nil {
		return err
	}
	if data == nil {
		return nil // fresh start
	}

	var m meta
	err = json.Unmarshal(data, &m)
	if err != nil {
		return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
	}

	for _, t := range m.Topics {
		if !protocol.IsValidTopicName(t.Name) {
			n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
			continue
		}
		topic := n.GetTopic(t.Name)
		if t.Paused {
			topic.Pause()
		}
		for _, c := range t.Channels {
			if !protocol.IsValidChannelName(c.Name) {
				n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
				continue
			}
			channel := topic.GetChannel(c.Name)
			if c.Paused {
				channel.Pause()
			}
		}
		topic.Start()
	}
	return nil
}
PersistMetadata()
1.當前的topic和channel資訊寫入nsqd.dat檔案中,主要步驟是忽略#ephemeral結尾的topic和channel後將topic和channel列表json序列化後寫回檔案中
2.當專案啟動,退出,Topic和Channel變化,以及Topic和Channel暫停狀態改變都會呼叫方法持久化到該檔案中

四、NSQD服務啟動

主要在nsq/nsqd/nsqd.go中的Main()函式

1.開啟一個tcp服務,用於監聽tcp連線
     (1)當有新的客戶端連線後,檢測協議版本號
     (2)最後呼叫protocol_v2的IOLoop(conn net.Conn)進行客戶端讀寫操作  (以後還會詳細介紹IOLoop方法)
2.開啟一個協程,開啟一個http的Serve,用於監聽http請求
3.可選,如果配置tls資訊,則新建一個https的服務,用於監聽https請求
4.呼叫queueScanLoop()方法 作用:
   (1)定期   執行根據channel的數量控制worker Pool中worker的數量
   (2)定期   檢測channel中是否有可以投遞的訊息,如果有,則投遞訊息
5.呼叫lookupLoop()方法 作用:
   (1)定時檢測nsqd和nsqlookupd的連線資訊(預設15s,執行一次PING命令來監聽)
   (2)有Channel和Topic更新,則傳送給所有配置的nsqlookupd
    (3)更新nsqlookupd配置
6.如果配置的StatsdAddress不為空,則呼叫statsdLoop,用於統計相關資訊

 

func (n *NSQD) Main() {
	var err error
	ctx := &context{n}


	n.tcpListener, err = net.Listen("tcp", n.getOpts().TCPAddress)
	if err != nil {
		n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err)
		os.Exit(1)
	}
	n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
	if err != nil {
		n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
		os.Exit(1)
	}
	
	//如果有tls配置,用於監聽https客戶端請求資料
	if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
		n.httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
		if err != nil {
			n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
			os.Exit(1)
		}
	}

	tcpServer := &tcpServer{ctx: ctx}
	//開啟一個協程,用於監聽客戶端連線,sync.WaitGroup.add(1),如果該協程結束,則Done()減1
	n.waitGroup.Wrap(func() {
		protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
	})
	//httpServer為http Serve的 Handler,定義了路由與對應得分方法
	httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
	//開啟一個協程,開啟一個http的Serve,用於監聽http請求
	n.waitGroup.Wrap(func() {
		http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
	})
	//開啟一個協程 處理一個 http的Serve,用於監聽https請求
	if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
		httpsServer := newHTTPServer(ctx, true, true)
		n.waitGroup.Wrap(func() {
			http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)
		})
	}

	n.waitGroup.Wrap(n.queueScanLoop)
	n.waitGroup.Wrap(n.lookupLoop)
	if n.getOpts().StatsdAddress != "" {
		n.waitGroup.Wrap(n.statsdLoop)
	}
}

 

執行完Main函式後,配置和初始化工作全部完成,各個元件啟動執行,而主goroutine會阻塞在<-signalChan處,直到收到中斷程式的訊號,隨後執行nsqd.Exit()函式。
Exit()函式將進行socket關閉等清理工作,隨後結束整個程式的執行。