1. 程式人生 > >NSQ原始碼分析(二)—— Topic

NSQ原始碼分析(二)—— Topic

Topic是NSQ非常重要的概念,本次主要講述Topic的獲取、新建、Topic中訊息的輪詢、Topic中訊息的來源、Topic的刪除和退出以及Topic的暫停和取消暫停

topic的相關操作主要在nsq/nsqd/topic.go中

首先看下Topic結構體

type Topic struct {
	// 64bit atomic vars need to be first for proper alignment on 32bit platforms
	messageCount uint64    //訊息數量

	sync.RWMutex

	name              string    //topic的名稱
	channelMap        map[string]*Channel   //該topic關聯的Channel資訊
	backend           BackendQueue      //用於接收訊息,訊息存放到磁碟
	memoryMsgChan     chan *Message    //用於接收訊息,訊息存放到記憶體
	startChan         chan int   //用於阻塞messagePump,直到收到startChan訊號
	exitChan          chan int    //退出的通道
	channelUpdateChan chan int   //channel有變更時傳送通知的通道
	waitGroup         util.WaitGroupWrapper
	exitFlag          int32   //退出
	idFactory         *guidFactory

	ephemeral      bool   //是否是臨時的topic
	deleteCallback func(*Topic)   //執行刪除的函式
	deleter        sync.Once   //只執行一次

	paused    int32   //是否暫停  1 暫停,0不暫停
	pauseChan chan int   //傳送暫停或取消暫停的訊息

	ctx *context   //上下文,儲存nsqd指標
}

 

一、Topic的獲取  

GetTopic以協程安全執行並返回指向Topic物件的指標(可能是新建立的)

  1.從nsqd例項的topicMap中獲取,如果有則返回

  2.如果topicMap中沒有對應的topic資訊,則新建topic(呼叫NewTopic)

  3.新建topic後,從nsqlookupd獲取對應的channel名稱

  4.如果channel名稱不存在topic的channelMap中,則新建channel

func (n *NSQD) GetTopic(topicName string) *Topic {
	// most likely, we already have this topic, so try read lock first.
	n.RLock()
	//從topicMap中讀取,如果有,則返回
	t, ok := n.topicMap[topicName]
	n.RUnlock()
	if ok {
		return t
	}

	//這次用通用鎖來處理
	n.Lock()

	t, ok = n.topicMap[topicName]
	if ok {
		n.Unlock()
		return t
	}
	
	//topicMap中沒有,則是第一次使用的topic,需要新建
	//定義刪除函式
	deleteCallback := func(t *Topic) {
		n.DeleteExistingTopic(t.name)
	}
	//建立topic
	t = NewTopic(topicName, &context{n}, deleteCallback)
	n.topicMap[topicName] = t

	n.Unlock()

	n.logf(LOG_INFO, "TOPIC(%s): created", t.name)
	// topic is created but messagePump not yet started

	// if loading metadata at startup, no lookupd connections yet, topic started after load
	//如果啟動時正在載入元資料,沒有連線nsqlookupd,topic在載入完元資料後啟動
	if atomic.LoadInt32(&n.isLoading) == 1 {
		return t
	}

	// if using lookupd, make a blocking call to get the topics, and immediately create them.
	// this makes sure that any message received is buffered to the right channels
	//獲取nsqd所配置的lookupd的http地址
	lookupdHTTPAddrs := n.lookupdHTTPAddrs()
	if len(lookupdHTTPAddrs) > 0 {
		//根據topic名稱,從nsqlookupd獲取所有關聯的channel名稱
		channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
		if err != nil {
			n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err)
		}
		for _, channelName := range channelNames {
			if strings.HasSuffix(channelName, "#ephemeral") {//臨時的
				continue // do not create ephemeral channel with no consumer client
			}
			t.GetChannel(channelName)
		}
	} else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 {
		n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name)
	}

	// now that all channels are added, start topic messagePump
	t.Start()
	return t
}

ps:提出兩個問題可以思考一下

  

什麼時候會呼叫GetTopic函式?

  1.在nsqd啟動的時候呼叫LoadMetadata() 載入元資料時會呼叫GetTopic用於初始化元資料中的Topic和Channel

   2.當生產者PUB訊息的時候,會指定topic,這時也會呼叫GetTopic

為什麼topic對應的channel名稱要從nsqlookupd中獲取?

   可以參考:https://blog.csdn.net/skh2015java/article/details/82747450

  當消費者消費訊息時會指定topic及對應的channel,所以nsqlookupd中維護者topic和channel的對應關係(一對多)

 

二、Topic的新建

topic分為臨時topic和永久topic,臨時topic  backend變數使用newDummyBackendQueue函式初始化。該函式生成一個無任何功能的dummyBackendQueue結構;

對於永久的topic,backend使用newDiskQueue函式返回diskQueue型別賦值,並開啟新的goroutine來進行資料的持久化。

1.初始化Topic結構體

2.對於非臨時topic,則初始化topic的backend為diskQueue,diskQueue是記錄在磁碟檔案中的FIFO佇列(當記憶體佇列滿的時候會用到該磁碟佇列)

3.開啟協程呼叫messagePump函式,messagePump的作用是將受到的佇列(記憶體佇列topic中的memoryMsgChan和磁碟佇列disQueue)中的訊息投遞到topic關聯的所有channel中

4.通知nsqd新建了topic

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
	t := &Topic{
		name:              topicName,
		channelMap:        make(map[string]*Channel),
		memoryMsgChan:     make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),   //記憶體訊息佇列,有快取,MemQueueSize預設是10000
		startChan:         make(chan int, 1),
		exitChan:          make(chan int),
		channelUpdateChan: make(chan int),
		ctx:               ctx,
		paused:            0,
		pauseChan:         make(chan int),
		deleteCallback:    deleteCallback,
		idFactory:         NewGUIDFactory(ctx.nsqd.getOpts().ID),
	}

	if strings.HasSuffix(topicName, "#ephemeral") {   //如果topic的名稱以#ephemeral開頭,如果是則是臨時topic
		t.ephemeral = true
		t.backend = newDummyBackendQueue()
	} else {
		dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
			opts := ctx.nsqd.getOpts()
			lg.Logf(opts.Logger, opts.logLevel, lg.LogLevel(level), f, args...)
		}
		t.backend = diskqueue.New(
			topicName,   //topic名稱
			ctx.nsqd.getOpts().DataPath,     //資料路徑
			ctx.nsqd.getOpts().MaxBytesPerFile,//每個檔案的最大位元組數
			int32(minValidMsgLength),   //每條訊息的最小長度
			int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,  //每條訊息的最大長度
			ctx.nsqd.getOpts().SyncEvery,
			ctx.nsqd.getOpts().SyncTimeout,
			dqLogf,
		)
	}

	t.waitGroup.Wrap(t.messagePump)

	//通知nsqd新建了Topic
	t.ctx.nsqd.Notify(t)

	return t
}

  messagePump訊息輪詢

messagePump函式主要作用輪詢將記憶體佇列和磁碟佇列中的訊息投遞給該topic關聯的所有Channel,channel的更新,暫停或取消暫停及退出等

1.不在Start()函式呼叫之前接收訊息,即不跳出第一個for迴圈

2.獲取所有該topic對應的Channel

3.當topic對應的Channel的數量大於0,並且該topic不是暫停狀態時初始化memoryMsgChan和backendChan

4.第二個for迴圈中的流程

  (1)從記憶體佇列或磁碟檔案中獲取訊息,並投遞給所有關聯的Channel

  (2)channel更新

  (3)暫停或取消暫停

  (4)退出  

func (t *Topic) messagePump() {
	var msg *Message
	var buf []byte
	var err error
	var chans []*Channel   //該topic對應的所有Channel
	var memoryMsgChan chan *Message    //記憶體訊息佇列
	var backendChan chan []byte   //backend佇列

	// do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
	//不在Start()函式呼叫之前接收訊息
	for {
		select {
		case <-t.channelUpdateChan:  //channel update的訊息通知
			continue
		case <-t.pauseChan:  //暫停
			continue
		case <-t.exitChan:   //退出
			goto exit
		case <-t.startChan:
		}
		break
	}
	t.RLock()
	for _, c := range t.channelMap {  //獲取所有該topic對應的Channel
		chans = append(chans, c)
	}
	t.RUnlock()
	if len(chans) > 0 && !t.IsPaused() {    //如果Channel的長度大於0,並且topic不是暫停狀態
		memoryMsgChan = t.memoryMsgChan  //獲取記憶體佇列
		backendChan = t.backend.ReadChan()  //獲取backendChan
	}

	// main message loop
	for {
		select {
		case msg = <-memoryMsgChan:   //從記憶體中獲取
		case buf = <-backendChan:  //從backendChan中獲取
			msg, err = decodeMessage(buf)  //需要將buf解碼成msg
			if err != nil {
				t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
				continue
			}
		case <-t.channelUpdateChan:  //channel更新
			chans = chans[:0]  //從新獲取chans
			t.RLock()
			for _, c := range t.channelMap {
				chans = append(chans, c)
			}
			t.RUnlock()
			if len(chans) == 0 || t.IsPaused() {  //如果Channel的個數為0或者topic是暫停,則將memoryMsgChan和backendChan置為nil
				memoryMsgChan = nil
				backendChan = nil
			} else { //負責重新指定memoryMsgChan和backendChan
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		case <-t.pauseChan:   //暫停
			if len(chans) == 0 || t.IsPaused() {  //如果Channel的個數為0或者topic是暫停,則將memoryMsgChan和backendChan置為nil
				memoryMsgChan = nil
				backendChan = nil
			} else { //負責重新指定memoryMsgChan和backendChan
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		case <-t.exitChan:  //退出
			goto exit
		}

		//以下為處理收到的msg
		for i, channel := range chans { //遍歷topic的所有的channel
			chanMsg := msg
			// copy the message because each channel
			// needs a unique instance but...
			// fastpath to avoid copy if its the first channel
			// (the topic already created the first copy)
			//複製訊息,因為每個channel需要唯一的例項
			if i > 0 {
				chanMsg = NewMessage(msg.ID, msg.Body)
				chanMsg.Timestamp = msg.Timestamp
				chanMsg.deferred = msg.deferred
			}
			if chanMsg.deferred != 0 { //傳送延時訊息
				channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
				continue
			}
			//傳送即時訊息
			err := channel.PutMessage(chanMsg)
			if err != nil {
				t.ctx.nsqd.logf(LOG_ERROR,
					"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
					t.name, msg.ID, channel.name, err)
			}
		}
	}

exit:
	t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}

 

三、Topic中訊息的獲取

    topic.go檔案中的put函式,是topic中訊息的獲取來源

    上面也提到topic中的訊息存在topic中的memoryMsgChan佇列中,該佇列的預設長度是10000,如果該佇列滿了,則會將訊息存到disQueue磁碟檔案中

當生產者PUB訊息時,會根據topic的名稱,將訊息寫入到對應topic的佇列中

1.如果memoryMsgChan佇列沒滿,則將訊息寫入該佇列

2.如果滿了則將訊息寫入到磁碟中

(1)通過bufferPoolGet函式從buffer池中獲取一個buffer,bufferPoolGet及以下bufferPoolPut函式是對sync.Pool的簡單包裝。 兩個函式位於nsqd/buffer_pool.go中。

(2)呼叫writeMessageToBackend函式將訊息寫入磁碟。

(3)通過bufferPoolPut函式將buffer歸還buffer池。

(4)呼叫SetHealth函式將writeMessageToBackend的返回值寫入errValue變數。 該變數衍生出IsHealthy,GetError和GetHealth3個函式,主要用於測試以及從HTTP API獲取nsqd的執行情況(是否發生錯誤)

func (t *Topic) put(m *Message) error {
	select {
	case t.memoryMsgChan <- m:
	default:
		b := bufferPoolGet()
		err := writeMessageToBackend(b, m, t.backend)
		bufferPoolPut(b)
		t.ctx.nsqd.SetHealth(err)
		if err != nil {
			t.ctx.nsqd.logf(LOG_ERROR,
				"TOPIC(%s) ERROR: failed to write message to backend - %s",
				t.name, err)
			return err
		}
	}
	return nil
}

 四、Topic的關閉和刪除

    

1.deleted為true,則通知nsqd

2.close(t.exitChan)  退出memoryMsgChan函式

3.如果deleted為true,則

   (1)刪除該topic對應的channel

    (2)channel.Delete()清空佇列中的訊息並關閉退出

   (3)t.Empty()清空記憶體佇列和磁碟檔案中的訊息

4.如果deleted為false,則

   (1)關閉topic對應的channel

   (2)呼叫t.flush()將記憶體佇列memoryMsgChan中的訊息寫入到磁碟檔案中

    (3)關閉並退出disQueue(檔案中的訊息是不刪除的)

總結來說:

對於刪除操作,需要清空channelMap並刪除所有channel,然後刪除記憶體和磁碟中所有未投遞的訊息。最後關閉backend管理的的磁碟檔案。

對於關閉操作,不清空channelMap,只是關閉所有的channel,使用flush函式將所有memoryMsgChan中未投遞的訊息用writeMessageToBackend儲存到磁碟中。最後關閉backend管理的的磁碟檔案。

// Delete empties the topic and all its channels and closes
func (t *Topic) Delete() error {
	return t.exit(true)
}

// Close persists all outstanding topic data and closes all its channels
func (t *Topic) Close() error {
	return t.exit(false)
}

func (t *Topic) exit(deleted bool) error {
	if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {
		return errors.New("exiting")
	}

	if deleted {
		t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name)

		// since we are explicitly deleting a topic (not just at system exit time)
		// de-register this from the lookupd
		t.ctx.nsqd.Notify(t)
	} else {
		t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name)
	}

	close(t.exitChan)

	// synchronize the close of messagePump()
	t.waitGroup.Wait()

	if deleted {
		t.Lock()
		for _, channel := range t.channelMap {
			delete(t.channelMap, channel.name)
			channel.Delete()
		}
		t.Unlock()

		// empty the queue (deletes the backend files, too)
		t.Empty()
		return t.backend.Delete()
	}

	// close all the channels
	for _, channel := range t.channelMap {
		err := channel.Close()
		if err != nil {
			// we need to continue regardless of error to close all the channels
			t.ctx.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel.name, err)
		}
	}

	// write anything leftover to disk
	t.flush()
	return t.backend.Close()
}

 

五、Topic的暫停和取消暫停

      topic的暫停和取消暫停主要是通過原子操作topic中的paused欄位來實現的,paused的值為1則是暫停,0是非暫停狀態

程式碼比較簡單  

func (t *Topic) Pause() error {
	return t.doPause(true)
}

func (t *Topic) UnPause() error {
	return t.doPause(false)
}

func (t *Topic) doPause(pause bool) error {
	if pause {
		atomic.StoreInt32(&t.paused, 1)
	} else {
		atomic.StoreInt32(&t.paused, 0)
	}

	select {
	case t.pauseChan <- 1:
	case <-t.exitChan:
	}

	return nil
}

func (t *Topic) IsPaused() bool {
	return atomic.LoadInt32(&t.paused) == 1
}