1. 程式人生 > >剖析nsq訊息佇列(二) 去中心化原始碼解析

剖析nsq訊息佇列(二) 去中心化原始碼解析

在上一篇帖子剖析nsq訊息佇列(一) 簡介及去中心化實現原理中,我介紹了nsq的兩種使用方式,一種是直接連線,還有一種是通過nslookup來實現去中心化的方式使用,並大概說了一下實現原理,沒有什麼難理解的東西,這篇帖子我把nsq實現去中心化的原始碼和其中的業物邏輯展示給大家看一下。

nsqd和nsqlookupd的通訊實現

上一篇中在啟動nsqd時我用了以下命令,我指定了一個引數 --lookupd-tcp-address

./nsqd -tcp-address ":8000"  -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a

--lookupd-tcp-address 用於指定nsqlookupdtcp監聽地址。

nsqdnsqlookupd的通訊交流簡單來說就是下圖這樣

nsqd啟動後連線nsqlookupd,連線成功後,要傳送一個魔法標識nsq.MagicV1,這個標識有啥魔法麼,當然不是,他只是用於標明,客戶端和服務端雙方使用的資訊通訊版本,不能的版本有不同的處理方式,為了後期做新的訊息處理版本方便吧。
nsqlookupd 的程式碼塊

func (p *tcpServer) Handle(clientConn net.Conn) {   
    // ...
    buf := make([]byte, 4)
    _, err := io.ReadFull(clientConn, buf)
    // ...
    protocolMagic := string(buf)
    // ...
    var prot protocol.Protocol
    switch protocolMagic {
    case "  V1":
        prot = &LookupProtocolV1{ctx: p.ctx}
    default:
        // ...
        return
    }
    err = prot.IOLoop(clientConn)
    //...
}

這個時候的nsqd已經和nsqlookupd建立好了連線,但是這時,僅僅說明他倆連線成功。
nsqlookupd也並沒有把這個連線加到可用的nsqd列表裡。
建立連線完成後,nsqd會發送IDENTIFY命令,這個命令裡包含了nsq的基本資訊
nsqd的程式碼

        ci := make(map[string]interface{})
        ci["version"] = version.Binary
        ci["tcp_port"] = n.RealTCPAddr().Port
        ci["http_port"] = n.RealHTTPAddr().Port
        ci["hostname"] = hostname
        ci["broadcast_address"] = n.getOpts().BroadcastAddress

        cmd, err := nsq.Identify(ci)
        if err != nil {
            lp.Close()
            return
        }
        resp, err := lp.Command(cmd)

包含了nsqd 提供的tcphttp埠,主機名,版本等等,傳送給nsqlookupd,nsqlookupd收到IDENTIFY命令後,解析資訊然後加到nsqd的可用列表裡
nsqlookupd 的程式碼塊

func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    var err error
    if client.peerInfo != nil {
        return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again")
    }
    var bodyLen int32
    err = binary.Read(reader, binary.BigEndian, &bodyLen)
    // ...
    body := make([]byte, bodyLen)
    _, err = io.ReadFull(reader, body)
    // ...  
    peerInfo := PeerInfo{id: client.RemoteAddr().String()}
    err = json.Unmarshal(body, &peerInfo)
    // ...
    client.peerInfo = &peerInfo
    // 把nsqd的連線加入到可用列表裡    
    if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {
        p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")
    }
    // ...
    return response, nil
}

然後每過15秒,會發送一個PING心跳命令給nsqlookupd,這樣保持存活狀態,nsqlookupd每次收到發過來的PING命令後,也會記下這個nsqd的最後更新時間,這樣做為一個篩選條件,如果長時間沒有更新,就認為這個節點有問題,不會把這個節點的資訊加入到可用列表。
到此為止,一個nsqd就把自己的資訊註冊到nsqlookupd的可用列表了,我們可以啟動多個nsqd和多個nsqlookupd,為nsqd
指定多個nsqlookupd,就如同我上一篇帖子寫的那樣

--lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200

nsqd和所有的nsqlookupd建立連線,註冊服務資訊,並保持心跳,保證可用列表的更新.

nsqlookupd 掛掉的處理方式

上面我們說了nsqd如果出現問題,nsqlookupdnsqd可用列表裡就會處理掉這個連線資訊。如nsqlookupd掛了怎麼辦呢

目前的處理方式是這樣的,
無論是心跳,還是其他命令,nsqd會給所有的nsqlookup傳送資訊,當nsqd發現nsqlookupd出現問題時,在每次傳送命令時,會不斷的進行重新連線:

func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) {
    initialState := lp.state
    if lp.state != stateConnected {
        err := lp.Connect()
        if err != nil {
            return nil, err
        }
        lp.state = stateConnected
        _, err = lp.Write(nsq.MagicV1)
        if err != nil {
            lp.Close()
            return nil, err
        }
        if initialState == stateDisconnected {
            lp.connectCallback(lp)
        }
        if lp.state != stateConnected {
            return nil, fmt.Errorf("lookupPeer connectCallback() failed")
        }
    }
    // ...
}

如果連線成功,會再次呼叫connectCallback方法,進行IDENTIFY命令的呼叫等。

客戶端和nsqlookupd、nsqd的通訊實現

上一篇帖子裡介紹了,客戶端如何連線nsqlookupd來進行通訊

    adds := []string{"127.0.0.1:7201", "127.0.0.1:8201"}
    config := nsq.NewConfig()
    config.MaxInFlight = 1000
    config.MaxBackoffDuration = 5 * time.Second
    config.DialTimeout = 10 * time.Second

    topicName := "testTopic1"
    c, _ := nsq.NewConsumer(topicName, "ch1", config)
    testHandler := &MyTestHandler{consumer: c}

    c.AddHandler(testHandler)
    if err := c.ConnectToNSQLookupds(adds); err != nil {
        panic(err)
    }

需要注意adds裡地址的埠,是nsqlookupdhttp
這裡我還使用上一篇帖子中的圖,給大家詳細分析

呼叫方法c.ConnectToNSQLookupds(adds),他的實現是訪問nsqlookupd的http埠http://127.0.0.1:7201/lookup?topic=testTopic1得到提供consumer訂閱的topic所有的producers節點資訊, url返回的資料資訊如下。

{
  "channels": [
    "nsq_to_file",
    "ch1"
  ],
  "producers": [
    {
      "remote_address": "127.0.0.1:58606",
      "hostname": "li-peng-mc-macbook.local",
      "broadcast_address": "li-peng-mc-macbook.local",
      "tcp_port": 8000,
      "http_port": 8001,
      "version": "1.1.1-alpha"
    },
    {
      "remote_address": "127.0.0.1:58627",
      "hostname": "li-peng-mc-macbook.local",
      "broadcast_address": "li-peng-mc-macbook.local",
      "tcp_port": 7000,
      "http_port": 7001,
      "version": "1.1.1-alpha"
    }
  ]
}


方法queryLookupd就是進行的上圖的操作

  • 得到提供訂閱的topicnsqd列表
  • 進行連線
func (r *Consumer) queryLookupd() {
    retries := 0
retry:
    endpoint := r.nextLookupdEndpoint()

    // ...  
    err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
    if err != nil {
        // ...
    }
    var nsqdAddrs []string
    for _, producer := range data.Producers {
        broadcastAddress := producer.BroadcastAddress
        port := producer.TCPPort
        joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
        nsqdAddrs = append(nsqdAddrs, joined)
    }
    // 進行連線
    for _, addr := range nsqdAddrs {
        err = r.ConnectToNSQD(addr)
        if err != nil && err != ErrAlreadyConnected {
            r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
            continue
        }
    }
}

如何重新整理nsqd的可用列表

有新的nsqd加入,是如何處理的呢?
在呼叫ConnectToNSQLookupd時會啟動一個協程go r.lookupdLoop() 呼叫方法lookupdLoop的定時迴圈訪問 queryLookupd 更新 nsqd的可用列表

// poll all known lookup servers every LookupdPollInterval
func (r *Consumer) lookupdLoop() {
    // ...
    var ticker *time.Ticker
    select {
    case <-time.After(jitter):
    case <-r.exitChan:
        goto exit
    }
    // 設定Interval 來迴圈訪問 queryLookupd
    ticker = time.NewTicker(r.config.LookupdPollInterval)
    for {
        select {
        case <-ticker.C:
            r.queryLookupd()
        case <-r.lookupdRecheckChan:
            r.queryLookupd()
        case <-r.exitChan:
            goto exit
        }
    }

exit:
    // ...
}

處理 nsqd 的單點故障


當有nsqd出現故障時怎麼辦?當前的處理方式是

  • nsqdlookupd會把這個故障節點從可用列表中去除,客戶端從介面得到的可用列表永遠都是可用的。
  • 客戶端會把這個故障節點從可用節點上移除,然後要去判斷是否使用了nsqlookup進行了連線,如果是則case r.lookupdRecheckChan <- 1 去重新整理可用列表queryLookupd,如果不是,然後啟動一個協程去定時做重試連線,如果故障恢復,連線成功,會重新加入到可用列表.
    客戶端實現的程式碼
func (r *Consumer) onConnClose(c *Conn) {
    // ...
    // remove this connections RDY count from the consumer's total
    delete(r.connections, c.String())
    left := len(r.connections)
    // ...
    r.mtx.RLock()
    numLookupd := len(r.lookupdHTTPAddrs)
    reconnect := indexOf(c.String(), r.nsqdTCPAddrs) >= 0
    // 如果使用的是nslookup則去重新整理可用列表
    if numLookupd > 0 {
        // trigger a poll of the lookupd
        select {
        case r.lookupdRecheckChan <- 1:
        default:
        }
    } else if reconnect {
        // ... 
        }(c.String())
    }
}