1. 程式人生 > >NSQ原始碼剖析(一):NSQD主要結構方法和訊息生產消費過程

NSQ原始碼剖析(一):NSQD主要結構方法和訊息生產消費過程

目錄

  • 1 概述
  • 2 主要結構體及方法
    • 2.1 NSQD
    • 2.2 tcpServer
    • 2.3 protocolV2
    • 2.4 clientV2
    • 2.5 Topic
    • 2.6 channel
  • 3 啟動過程
  • 4 消費和生產過程
    • 4.1 訊息生產
    • 4.2 訊息消費
    • 4.2 延遲消費

1 概述

NSQ包含3個元件:

  • nsqd:每個nsq例項執行一個nsqd程序,負責接收生產者訊息、向nsqlookupd註冊、向消費者推送訊息
  • nsqlookupd:叢集註冊中心,可以有多個,負責接收nsqd的註冊資訊,向消費者提供服務發現
  • nsqadmin:用於監控和管理的web ui

生產者將訊息寫入到指定的主題Topic,同一個Topic下則可以關聯多個管道Channel,每個Channel都會傳輸對應Topic的完整副本。
消費者則訂閱Channel的訊息。於是多個消費者訂閱不同的Channel的話,他們各自都能拿到完整的訊息副本;但如果多個消費者訂閱同一個Channel,則是共享的,即訊息會隨機發送給其中一個消費者。

接下來我們來分析下nsq的原始碼:

  • 原始碼地址:https://github.com/nsqio/nsq
  • 2020年1月18日最新的master分支

nsq各元件均使用上述程式碼倉庫,通過apps目錄下的不同的main包啟動。比如nsqd的main函式在apps/nsqd目錄下,其他類同。

本文件主要分析nsqd的主要結構體和方法,及訊息生產和消費的過程。主要以TCP api為例來分析,HTTP/HTTPS的api類同。

2 主要結構體及方法

2.1 NSQD

nsqd/nsqd.go檔案,NSQD是主例項,一個nsqd程序建立一個nsqd結構體例項,並通過此結構體的Main()方法啟動所有的服務。

type NSQD struct {
    clientIDSequence int64 // 遞增的客戶端ID,每個客戶端連線均從這裡取一個遞增後的ID作為唯一標識

    sync.RWMutex

    opts atomic.Value // 引數選項,真實型別是apps/nsqd/option.go:Options結構體

    dl        *dirlock.DirLock
    isLoading int32
    errValue  atomic.Value
    startTime time.Time

    topicMap map[string]*Topic  // 儲存當前所有的topic

    clientLock sync.RWMutex
    clients    map[int64]Client

    lookupPeers atomic.Value

    tcpServer     *tcpServer
    tcpListener   net.Listener
    httpListener  net.Listener
    httpsListener net.Listener
    tlsConfig     *tls.Config

    poolSize int                // 當前工作協程組的協程數量

    notifyChan           chan interface{}
    optsNotificationChan chan struct{}
    exitChan             chan int
    waitGroup            util.WaitGroupWrapper

    ci *clusterinfo.ClusterInfo
}

主要方法

/*
    程式啟動時呼叫本方法,執行下面的動作:
        - 啟動TCP/HTTP/HTTPS服務
        - 啟動工作協程組:NSQD.queueScanLoop
        - 啟動服務註冊:NSQD.lookupLoop
*/
func (n *NSQD) Main() error

// 負責管理工作協程組的數量,每呼叫一次NSQD.queueScanWorker()方法啟動一個工作協程
func (n *NSQD) queueScanLoop()

// 由queueScanLoop()呼叫,負責啟動工作協程組並動態調整協程數量。工作協程的數量為當前的channel數 * 0.25
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int)

// 這是具體的工作協程,監聽workCh,對收到的待處理Channel做兩個動作,一是將超時的訊息重新入隊;二是將到時間的延時訊息入隊
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int)

/*
    lookupLoop()方法與nsqlookupd建立連線,負責向nsqlookupd註冊topic,並定時傳送心跳包
*/
func (n *NSQD) lookupLoop()

2.2 tcpServer

nsqd/tcp.go檔案,tcpServer通過Handle()方法接收TCP請求。
tcpServer是nsqd結構的成員,全域性也就只有一個例項,但在protocol包的TCPServer方法中,每建立一個新的連線,均會呼叫一次tcpServer.Handle()

type tcpServer struct {
    ctx   *context
    conns sync.Map
}

主要方法

/*
    p.nsqd.Main()啟動protocol.TCPServer(),這個方法裡會為每個客戶端連線建立一個新協程,協程執行tcpServer.Handle()方法
    本方法首先對新連線讀取4位元組校驗版本,新連線必須首先發送4位元組"  V2"。
    然後阻塞呼叫nsqd.protocolV2.IOLoop()處理客戶端接下來的請求。
*/
func (p *tcpServer) Handle(clientConn net.Conn)

2.3 protocolV2

nsqd/protocol_v2.go檔案,protocolV2負責處理過來的具體的使用者請求。
每個連線均會建立一個獨立的protocolV2例項(由tcpServer.Handle()建立)

type protocolV2 struct {
    ctx *context
}

主要方法

/*
    tcpServer.Handle()阻塞呼叫本方法
    1. 啟用一個獨立協程向消費者推送訊息protocolV2.messagePump()
    2. for迴圈接收並處理客戶端請求protocolV2.Exec()
*/
func (p *protocolV2) IOLoop(conn net.Conn) error

// 組裝訊息並呼叫protocolV2.Send()傳送給消費者
func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error

// 向客戶端傳送資料幀
func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error

// 解析客戶端請求的指令,呼叫對應的指令方法
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error)

// 負責向消費者推送訊息
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool)

// 下面這組方法是NSQD支援的指令對應的處理方法
func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) AUTH(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) RDY(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) CLS(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) NOP(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error)

2.4 clientV2

nsqd/client_v2.go檔案,儲存每個客戶端的連線資訊。
clientV2例項由protocolV2.IOLoop()建立,每個連線均有一個獨立的例項。

type clientV2 struct {
    // 64bit atomic vars need to be first for proper alignment on 32bit platforms
    ReadyCount    int64
    InFlightCount int64
    MessageCount  uint64
    FinishCount   uint64
    RequeueCount  uint64

    pubCounts map[string]uint64

    writeLock sync.RWMutex
    metaLock  sync.RWMutex

    ID        int64
    ctx       *context
    UserAgent string

    // original connection
    net.Conn

    // connections based on negotiated features
    tlsConn     *tls.Conn
    flateWriter *flate.Writer

    // reading/writing interfaces
    Reader *bufio.Reader
    Writer *bufio.Writer

    OutputBufferSize    int
    OutputBufferTimeout time.Duration

    HeartbeatInterval time.Duration

    MsgTimeout time.Duration

    State          int32
    ConnectTime    time.Time
    Channel        *Channel
    ReadyStateChan chan int
    ExitChan       chan int

    ClientID string
    Hostname string

    SampleRate int32

    IdentifyEventChan chan identifyEvent
    SubEventChan      chan *Channel

    TLS     int32
    Snappy  int32
    Deflate int32

    // re-usable buffer for reading the 4-byte lengths off the wire
    lenBuf   [4]byte
    lenSlice []byte

    AuthSecret string
    AuthState  *auth.State
}

2.5 Topic

nsqd/topic.go檔案,對應於每個topic例項,由系統啟動時建立或者釋出訊息/消費訊息時自動建立。

type Topic struct {
    messageCount uint64 // 累計訊息數
    messageBytes uint64 // 累計訊息體的位元組數

    sync.RWMutex

    name              string                // topic名,生產和消費時需要指定此名稱
    channelMap        map[string]*Channel   // 儲存每個channel name和channel指標的對映
    backend           BackendQueue          // 磁碟佇列,當記憶體memoryMsgChan滿時,寫入硬碟佇列
    memoryMsgChan     chan *Message         // 訊息優先存入這個記憶體chan
    startChan         chan int
    exitChan          chan int
    channelUpdateChan chan int
    waitGroup         util.WaitGroupWrapper
    exitFlag          int32
    idFactory         *guidFactory

    ephemeral      bool
    deleteCallback func(*Topic)
    deleter        sync.Once

    paused    int32
    pauseChan chan int

    ctx *context
}

主要方法

/*
    下面兩個方法負責將訊息寫入topic,底層均呼叫topic.put()方法
    1. topic.memoryMsgChan未滿時,優先寫入記憶體memoryMsgChan
    2. 否則,寫入磁碟topic.backend
*/
func (t *Topic) PutMessage(m *Message) error
func (t *Topic) PutMessages(msgs []*Message) error

/*
    NewTopic建立新的topic時會為每個topic啟動一個獨立執行緒來處理訊息推送,即messagePump()
    此方法迴圈隨機從記憶體memoryMsgChan和磁碟佇列backend中取訊息寫入到topic下每一個chnnel中
*/
func (t *Topic) messagePump()

2.6 channel

nsqd/channel.go檔案,對應於每個channel例項

type Channel struct {
    requeueCount uint64
    messageCount uint64
    timeoutCount uint64

    sync.RWMutex

    topicName string
    name      string
    ctx       *context

    backend BackendQueue        // 磁碟佇列,當記憶體memoryMsgChan滿時,寫入硬碟佇列

    memoryMsgChan chan *Message // 訊息優先存入這個記憶體chan
    exitFlag      int32
    exitMutex     sync.RWMutex

    // state tracking
    clients        map[int64]Consumer
    paused         int32
    ephemeral      bool
    deleteCallback func(*Channel)
    deleter        sync.Once

    // Stats tracking
    e2eProcessingLatencyStream *quantile.Quantile

    // TODO: these can be DRYd up
    deferredMessages map[MessageID]*pqueue.Item // 儲存尚未到時間的延遲消費訊息
    deferredPQ       pqueue.PriorityQueue       // 儲存尚未到時間的延遲消費訊息,最小堆
    deferredMutex    sync.Mutex
    inFlightMessages map[MessageID]*Message     // 儲存已推送尚未收到FIN的訊息
    inFlightPQ       inFlightPqueue             // 儲存已推送尚未收到FIN的訊息,最小堆
    inFlightMutex    sync.Mutex
}

主要方法

/*
    將訊息寫入channel,邏輯與topic的一致,記憶體未滿則優先寫記憶體chan,否則寫入磁碟佇列
*/
func (c *Channel) PutMessage(m *Message) error
func (c *Channel) put(m *Message) error

// 消費超時相關
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error
func (c *Channel) pushInFlightMessage(msg *Message) error
func (c *Channel) popInFlightMessage(clientID int64, id MessageID) (*Message, error)
func (c *Channel) addToInFlightPQ(msg *Message)
func (c *Channel) removeFromInFlightPQ(msg *Message)
func (c *Channel) processInFlightQueue(t int64) bool

// 延時消費相關
func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error
func (c *Channel) pushDeferredMessage(item *pqueue.Item) error
func (c *Channel) popDeferredMessage(id MessageID) (*pqueue.Item, error)
func (c *Channel) addToDeferredPQ(item *pqueue.Item)
func (c *Channel) processDeferredQueue(t int64) bool

3 啟動過程

nsqd的main函式在apps/nsqd/main.go檔案。
啟動時呼叫了一個第三方包svc,主要作用是攔截syscall.SIGINT/syscall.SIGTERM這兩個訊號,最終還是呼叫了main.go下的3個方法:

  • program.Init():windows下特殊操作
  • program.Start():載入引數和配置檔案、載入上一次儲存的Topic資訊並完成初始化、建立nsqd並呼叫p.nsqd.Main()啟動
  • program.Stop():退出處理

p.nsqd.Main()的邏輯也很簡單,程式碼不貼了,依次啟動了TCP服務、HTTP服務、HTTPS服務這3個服務。除此之外,還啟動了以下兩個協程:

  • queueScanLoop:訊息延時/超時處理
  • lookupLoop:服務註冊

TCPServer
protocol包的TCPServer的核心程式碼就是下面這幾行,迴圈等待客戶端連線,併為每個連線建立一個獨立的協程:

func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
    for {
        // 等待生產者或消費者連線
        clientConn, err := listener.Accept()

        // 每建立一個連線wg +1
        wg.Add(1)
        go func() {
            // 每個連線均啟動一個獨立的協程來接收處理請求
            handler.Handle(clientConn)
            wg.Done()
        }()
    }

    // 等待所有協程退出
    wg.Wait()

    return nil
}

TCPServer的核心是為每個連線啟動的協程處理方法handler.Handle(clientConn),實際呼叫的是下面這個方法,連線建立時先讀取4位元組,必須是" V2",然後啟動prot.IOLoop(clientConn)處理接下來的客戶端請求:

func (p *tcpServer) Handle(clientConn net.Conn) {
    // 無論是生產者還是消費者,建立連線時,必須先發送4位元組的"  V2"進行版本校驗
    buf := make([]byte, 4)
    _, err := io.ReadFull(clientConn, buf)
    protocolMagic := string(buf)

    var prot protocol.Protocol
    switch protocolMagic {
    case "  V2":
        prot = &protocolV2{ctx: p.ctx}
    default:
        protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
        clientConn.Close()
        p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
            clientConn.RemoteAddr(), protocolMagic)
        return
    }

    // 版本校驗通過,儲存連線資訊,key-是ADDR,value是當前連線指標
    p.conns.Store(clientConn.RemoteAddr(), clientConn)

    // 啟動
    err = prot.IOLoop(clientConn)
    if err != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
    }

    p.conns.Delete(clientConn.RemoteAddr())
}

4 消費和生產過程

4.1 訊息生產

生產者pub訊息時,訊息會首先寫入對應topic的佇列(記憶體優先,記憶體滿了寫磁碟),topic的messagePump()方法再將訊息拷貝給每個channel。
每個channel均各執一份完整的訊息。

1.訊息寫入topic
訊息生產由生產者呼叫PUB/MPUB/DPUB這類指令實現,底層都是呼叫topic.PutMessage(msg),進一步呼叫topic.put(msg):

func (t *Topic) put(m *Message) error {
    select {
    case t.memoryMsgChan <- m:  // 優先寫入記憶體memoryMsgChan
    default:                    // 當記憶體case失敗即memoryMsgChan滿時,走default,將msg以位元組形式寫入磁碟佇列topic.backend
        b := bufferPoolGet()
        err := writeMessageToBackend(b, m, t.backend)
        bufferPoolPut(b)
        t.ctx.nsqd.SetHealth(err)
        if err != nil {
            t.ctx.nsqd.logf(LOG_ERROR,
                "TOPIC(%s) ERROR: failed to write message to backend - %s",
                t.name, err)
            return err
        }
    }
    return nil
}

訊息寫入topic的邏輯比較簡單,優先寫memoryMsgChan,如果memoryMsgChan滿了,則寫入磁碟佇列topic.backend。
這裡留個思考題:NSQ是否支援不寫記憶體,全部寫磁碟佇列?

2.topic將訊息複製給每個channel
第二章介紹結構體和方法時,介紹了topic結構體的messagePump()方法,正是這個方法將第1步寫入的訊息複製給每個channel的:

func (t *Topic) messagePump() {
    /* 準備工作有程式碼我們略過 */
    // 主訊息處理迴圈
    for {
        select {
        case msg = <-memoryMsgChan:
        case buf = <-backendChan:
            msg, err = decodeMessage(buf)
            if err != nil {
                t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
                continue
            }
        case <-t.channelUpdateChan:
            chans = chans[:0]
            t.RLock()
            for _, c := range t.channelMap {
                chans = append(chans, c)
            }
            t.RUnlock()
            if len(chans) == 0 || t.IsPaused() {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        case <-t.pauseChan:
            if len(chans) == 0 || t.IsPaused() {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        case <-t.exitChan:
            goto exit
        }

        for i, channel := range chans {
            chanMsg := msg
            /* channel消費訊息時,需要處理延時/超時等問題,所以這裡複製了訊息,給每個channel傳遞的是獨立的訊息例項 */
            if i > 0 {
                chanMsg = NewMessage(msg.ID, msg.Body)
                chanMsg.Timestamp = msg.Timestamp
                chanMsg.deferred = msg.deferred
            }
            if chanMsg.deferred != 0 {
                channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                continue
            }
            err := channel.PutMessage(chanMsg)
            if err != nil {
                t.ctx.nsqd.logf(LOG_ERROR,
                    "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                    t.name, msg.ID, channel.name, err)
            }
        }
    }
}

topic.messagePump()方法程式碼還蠻長的,前面是些準備工作,主要就是後面的for迴圈。其中for迴圈中select的前兩項,memoryMsgChan來源於topic.memoryMsgChan,而backendChan則是topic.backend.ReadChan(),分別對應於記憶體和磁碟佇列。注意只有這兩個case會往下傳遞訊息,其他的case處理退出和更新機制的,會continue或exit外層的for迴圈。
雖然通道channel是有序的,但select的case具有隨機性,這就決定了每輪迴圈讀的是記憶體還是磁碟是隨機的,訊息的消費順序是不可控的。
select語句獲取的訊息,交給第2層for迴圈處理,邏輯比較簡單,遍歷每一個chan,呼叫channel.PutMessage()寫入。由於每個channel對應於不同的消費者,有不同的延時/超時和消費機制,所以這裡拷貝了message例項。

4.2 訊息消費

每個連線均會啟動一個執行protocolV2.messagePump()方法的協程,這個協程負責監聽channel的訊息佇列並向客戶端推送訊息。客戶端只有觸發SUB指令之後,才會將channel傳遞給protocolV2.messagePump(),這之後消費推送才會正式開啟。
啟動訊息推送
前面講Tcpserver時有提到,客戶端建立連線時,會呼叫tcpserver.Handle(),裡面再呼叫protocolV2.IOLoop()。protocolV2.IOLoop()方法開頭有下面這行:

    go p.messagePump(client, messagePumpStartedChan)

這行建立了一個獨立執行緒,呼叫的protocolV2.messagePump()負責向消費者推送訊息。
有個小細節是無論是生產者還是消費者,都會建立這個協程,protocolV2.messagePump()建立後並不會立即推送訊息,而是需要呼叫SUB指令,以protocolV2.SUB()方法為例,方法末尾有這麼一行:

    client.SubEventChan <- channel

將當前消費者訂閱的channel傳入client.SubEventChan,這個會由protocolV2.messagePump()接收,這個方法核心是下面這個for迴圈(限於篇幅,我省略了大量無關程式碼):

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    for {
        if subChannel == nil || !client.IsReadyForMessages() {
            // the client is not ready to receive messages...
            memoryMsgChan = nil
            backendMsgChan = nil
            flusherChan = nil
            // force flush
            client.writeLock.Lock()
            err = client.Flush()
            client.writeLock.Unlock()
            if err != nil {
                goto exit
            }
            flushed = true
        } else if flushed {
            // last iteration we flushed...
            // do not select on the flusher ticker channel
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = nil
        }
        
        select {
        case subChannel = <-subEventChan:
            // you can't SUB anymore
            subEventChan = nil
        case b := <-backendMsgChan:
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }

            msg, err := decodeMessage(b)
            if err != nil {
                p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
                continue
            }
            msg.Attempts++

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            err = p.SendMessage(client, msg)
            if err != nil {
                goto exit
            }
            flushed = false
        case msg := <-memoryMsgChan:
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }
            msg.Attempts++

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            err = p.SendMessage(client, msg)
            if err != nil {
                goto exit
            }
            flushed = false
        case <-client.ExitChan:
            goto exit
        }
    }
}

客戶端建立連線初始,subChannel為空,迴圈一直走第1個if語句。直到客戶端呼叫SUB指令,select語句執行"case subChannel = <-subEventChan:",此時subChannel非空,接下來backendMsgChan和memoryMsgChan被賦值,此後開始推送訊息:

  • 訊息會隨機從記憶體和磁碟佇列取,因為如果記憶體和磁碟都有資料,select是隨機的
  • 訊息通過protocolV2.SendMessage()推送給消費者

當多個消費者訂閱同一個channel時情況會如何?
上面我們提到消費者發起SUB指令訂閱訊息,protocolV2.SUB()會將chan傳給protocolV2.messagePump(),即這一行“client.SubEventChan <- channel”,那麼我們來看下這個channel變數怎麼來的:

func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
    ...
    
    topic := p.ctx.nsqd.GetTopic(topicName)
    channel = topic.GetChannel(channelName)
    
    ...
    client.SubEventChan <- channel

SUB方法包含多種邏輯:

  • 當channel不存在時,topic.GetChannel()方法自動建立並與這個消費者繫結
  • 當channel存在,比如事先通過http-api建立好了,但沒有消費者訂閱,則當前消費者獨立繫結這個channel
  • 當channel存在,且已經有消費者訂閱了,topic.GetChannel()方法依然會返回這個channel,這時就有多個消費者同時訂閱了這個channel,大家共用一個通道chan變數

由於是多個消費者共用一個通道chan變數,每個消費者都有一個for select在迴圈監聽這個通道,根據chan變數的特性,消費會隨機發送給一位消費者,且一條訊息只會推送給一個消費者。

消費超時處理
protocolV2.messagePump()方法,無論是“case b := <-backendMsgChan:”還是“case msg := <-memoryMsgChan:”,在向消費者推送訊息前都呼叫了下面這行程式碼:

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) // 省略其他程式碼
}

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
    msg.pri = now.Add(timeout).UnixNano() // pri成員儲存本訊息超時時間
    err := c.pushInFlightMessage(msg)
    c.addToInFlightPQ(msg)
}

channel.StartInFlightTimeout()將訊息儲存到channel的inFlightMessages和inFlightPQ佇列中,這兩個快取是用來處理消費超時的。
值得注意的一個小細節是c.addToInFlightPQ(msg)將msg壓入最小堆時,將msg在陣列的偏移量儲存到了msg.index成員中(最小堆底層是陣列實現)

我們先簡單看下FIN指令會做啥:

func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {
    err = client.Channel.FinishMessage(client.ID, *id) // 省略其他程式碼
}

func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
    // 省略其他程式碼
    msg, err := c.popInFlightMessage(clientID, id)

    c.removeFromInFlightPQ(msg)
}

FIN的動作比較簡單,主要就是呼叫channel.FinishMessage()方法把上面寫入超時快取的msg給刪除掉。

FIN從inFlightMessages中刪除訊息比較容易,這是個map,key是msg.id。客戶端傳送FIN訊息時附帶了msg.id。但如何從最小堆inFlightPQ中刪除對應的msg呢?前面提到在入堆時的一個細節,即儲存了msg的偏移量,此時正好用上。通過msg.index直接定位到msg的位置並調整堆即可。

說了這麼多,最小堆的作用是啥?別急,接下來我們看下超時邏輯:
超時邏輯由程式啟動時開啟的工作執行緒組來處理,即NSQD.queueScanLoop()方法:

func (n *NSQD) queueScanLoop() {
    n.resizePool(len(channels), workCh, responseCh, closeCh)

    for {
        select {
        case <-workTicker.C: // 定時觸發工作
            if len(channels) == 0 {
                continue
            }
        case <-refreshTicker.C: // 動態調整協程組的數量
            channels = n.channels()
            n.resizePool(len(channels), workCh, responseCh, closeCh)
            continue
        case <-n.exitChan:
            goto exit
        }

        num := n.getOpts().QueueScanSelectionCount
        if num > len(channels) {
            num = len(channels)
        }

    loop:
        for _, i := range util.UniqRands(num, len(channels)) {
            workCh <- channels[i] // 觸發協程組工作
        }

        numDirty := 0
        for i := 0; i < num; i++ {
            if <-responseCh {
                numDirty++
            }
        }

        if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
            goto loop
        }
    }
}

NSQD.queueScanLoop()方法主要有一個for迴圈,內層是一個select和一個loop迴圈。select中,第1個定時器case <-workTicker.C的作用是定時觸發工作,只有這個case會跳出select走到下面的loop。第2個定時器負責啟動工作協程組並動態調整協程數量,我們來看下第2個定時器呼叫的resizePool()方法:

func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    idealPoolSize := int(float64(num) * 0.25) // 協程數量設定為channel數的1/4
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        if idealPoolSize == n.poolSize {        // 當協程數量達到協程數量設定為channel數的1/4時,退出
            break
        } else if idealPoolSize < n.poolSize {  // 否則如果當前協程數大於目標值,則通過closeCh通知部分協程退出
            // contract
            closeCh <- 1
            n.poolSize--
        } else {                                // 否則協程數不夠,則啟動新的協程
            // expand
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

resizePool()方法上面的註釋已經說的很清楚了,作用就是保持工作協程數量為當前channel數的1/4。

接下來我們看具體的工作邏輯,queueScanWorker()方法:

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            if c.processInFlightQueue(now) {
                dirty = true
            }
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}

queueScanWorker()方法的程式碼很短,一是監聽closeCh的退出訊號;二是監聽workCh的工作訊號。workCh會將需要處理的channel傳入,然後呼叫processInFlightQueue()清理超時的訊息,呼叫processDeferredQueue()清理到時間的延時訊息:

func (c *Channel) processInFlightQueue(t int64) bool {
    dirty := false
    for {
        c.inFlightMutex.Lock()
        msg, _ := c.inFlightPQ.PeekAndShift(t)
        c.inFlightMutex.Unlock()

        if msg == nil {
            goto exit
        }
        dirty = true

        _, err := c.popInFlightMessage(msg.clientID, msg.ID)
        if err != nil {
            goto exit
        }
        atomic.AddUint64(&c.timeoutCount, 1)
        c.RLock()
        client, ok := c.clients[msg.clientID]
        c.RUnlock()
        if ok {
            client.TimedOutMessage()
        }
        c.put(msg)
    }

exit:
    return dirty
}

func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) {
    if len(*pq) == 0 {
        return nil, 0
    }

    x := (*pq)[0]
    if x.pri > max {
        return nil, x.pri - max
    }
    pq.Pop()

    return x, 0
}

前面提到msg.pri成員儲存本訊息超時時間,所以PeekAndShift()返回的是最小堆裡已經超時且超時時間最長的那條訊息。processInFlightQueue()則將訊息從超時佇列中刪,同時將訊息重新put進channel。注意此時超時的訊息put進channel後實際是排在隊尾的,消費順序將發生改變。
processInFlightQueue()方法如果存在超時訊息,返回值dirty標識true。queueScanWorker()將dirty寫入responseCh。再往回看,queueScanLoop()方法統計了dirty的數量,超過一定比例會繼續執行loop,而不是等待下一次定時執行。

4.2 延遲消費

生產者呼叫DPUB釋出的訊息,可以指定延時多少再推送給消費者。
我們來看下DPUB的邏輯:

func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
    timeoutMs, err := protocol.ByteToBase10(params[2])
    timeoutDuration := time.Duration(timeoutMs) * time.Millisecond
    msg := NewMessage(topic.GenerateID(), messageBody)
    msg.deferred = timeoutDuration
    err = topic.PutMessage(msg)
}

從上面擷取的PUB()方法程式碼可以看出,DPUB的訊息會將延時時間寫入msg.deferred成員。4.1章節第2部分介紹的Topic.messagePump()方法有下面這段:

func (t *Topic) messagePump() {
            if chanMsg.deferred != 0 {
                channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
                continue
            }
}

當chanMsg.deferred != 0時表示延時訊息,此時不是直接呼叫putMessage()方法寫入channel,而是呼叫channel.PutMessageDeferred(chanMsg, chanMsg.deferred),訊息被寫入了延時佇列Channel.deferredMessages和Channel.deferredPQ。之後的邏輯是在工作協程組NSQD.queueScanLoop()中被識別並put進channel,這與超時的處理邏輯是一樣的,不展開說。