1. 程式人生 > >剖析nsq訊息佇列(一) 簡介及去中心化實現原理

剖析nsq訊息佇列(一) 簡介及去中心化實現原理

分散式訊息佇列nsq,簡單易用,去中心化的設計使nsq更健壯,nsq充分利用了go語言的goroutinechannel來實現的訊息處理,程式碼量也不大,讀不了多久就沒了。後期的文章我會把nsq的原始碼分析給大家看。
主要的分析路線如下

  • 分析nsq的整體框架結構,分析如何做到的無中心化分散式拓撲結構,如何處理的單點故障。
  • 分析nsq是如何保證訊息的可靠性,如何保證訊息的處理,對於訊息的持久化是如何處理和擴充套件的。
  • 分析nsq是如何做的訊息的負載處理,即如何把合理的、不超過客戶端消費能力的情況下,把訊息分發到不同的客戶端。
  • 分析nsq提供的一些輔助元件。

這篇帖子,介紹nsq的主體結構,以及他是如何做到去中心化的分散式拓撲結構,如何處理的單點故障。

幾個元件是需要先大概說一下
nsqd 訊息佇列的主體,對訊息的接收,處理和把訊息分發到客戶端。
nsqlookupd nsq拓撲結構資訊的管理者,有了他才能組成一個簡單易用的無中心化的分散式拓撲網路結構。
go-nsq nsq官方的go語言客戶端,基本上市面上的主流程式語言都有相應的客戶端在這裡
還有視覺化的元件nsqadmin和一些工具像nsq_to_filensq_stat、等等,這些在後期的帖子裡會介紹

使用方式

直連方式

nsqd是獨立執行的,我們可以直接使用部署幾個nsqd然後使用客戶端直連的方式使用

例子

目前資源有限,我就都在一臺機器上模擬了
啟動兩個nsqd

./nsqd -tcp-address ":8000"  -http-address ":8001" -data-path=./a
./nsqd -tcp-address ":7000"  -http-address ":7001" -data-path=./b

正常啟動會有類似下面的輸出

[nsqd] 2019/08/29 18:42:56.928345 INFO: nsqd v1.1.1-alpha (built w/go1.12.7)
[nsqd] 2019/08/29 18:42:56.928512 INFO: ID: 538
[nsqd] 2019/08/29 18:42:56.928856 INFO: NSQ: persisting topic/channel metadata to b/nsqd.dat
[nsqd] 2019/08/29 18:42:56.935797 INFO: TCP: listening on [::]:7000
[nsqd] 2019/08/29 18:42:56.935891 INFO: HTTP: listening on [::]:7001

簡單使用

func main() {
    adds := []string{"127.0.0.1:7000", "127.0.0.1:8000"}
    config := nsq.NewConfig()

    topicName := "testTopic1"
    c, _ := nsq.NewConsumer(topicName, "ch1", config)
    testHandler := &MyTestHandler{consumer: c}

    c.AddHandler(testHandler)
    if err := c.ConnectToNSQDs(adds); err != nil {
        panic(err)
    }
    stats := c.Stats()
    if stats.Connections == 0 {
        panic("stats report 0 connections (should be > 0)")
    }
    stop := make(chan os.Signal)
    signal.Notify(stop, os.Interrupt)
    fmt.Println("server is running....")
    <-stop
}

type MyTestHandler struct {
    consumer *nsq.Consumer
}

func (m MyTestHandler) HandleMessage(message *nsq.Message) error {
    fmt.Println(string(message.Body))
    return nil
}

方法 c.ConnectToNSQDs(adds),連線多個nsqd服務
然後執行多個客戶端實現
這時,我們傳送一個訊息,

curl -d 'hello world 2' 'http://127.0.0.1:7001/pub?topic=testTopic1'

nsqd會根據他的演算法,把訊息分配到一個客戶端
客戶端的輸入如下

2019/08/30 12:05:32 INF    1 [testTopic1/ch1] (127.0.0.1:7000) connecting to nsqd
2019/08/30 12:05:32 INF    1 [testTopic1/ch1] (127.0.0.1:8000) connecting to nsqd
server is running....
hello world 2

但是這種做的話,需要客戶端做一些額外的工作,需要頻繁的去檢查所有nsqd的狀態,如果發現出現問題需要客戶端主動去處理這些問題。

總結

我使用的客戶端庫是官方庫 go-nsq,使用直接連nsqd的方式,

  • 如果有nsqd出現問題,現在的處理方式,他會每隔一段時間執行一次重連操作。想去掉這個連線資訊就要額外做一些處理了。
  • 如果對nsqd進行橫向擴充,只能是自己民額外的寫一些程式碼呼叫ConnectToNSQDs或者ConnectToNSQD方法

去中心化連線方式 nsqlookupd

官方推薦使用連線nsqlookupd的方式,nsqlookupd用於做服務的註冊和發現,這樣可以做到去中心化。

圖中我們執行著多個nsqd和多個nsqlookupd的例項,客戶端去連線nsqlookupd來操作nsqd

例子

我們要先啟動nsqlookupd,為了演示方便,我啟動兩個nsqlookupd例項, 三個nsqd例項

./nsqlookupd -tcp-address ":8200" -http-address ":8201"
./nsqlookupd -tcp-address ":7200" -http-address ":7201"

為了演示橫向擴充,先啟動兩個,客戶端連線後,再啟動第三個。

./nsqd -tcp-address ":8000"  -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a
./nsqd -tcp-address ":7000"  -http-address ":7001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200  -data-path=./b

--lookupd-tcp-address 用於指定lookup的連線地址

客戶端簡單程式碼

package main

import (
    "fmt"
    "os"
    "os/signal"
    "time"

    "github.com/nsqio/go-nsq"
)

func main() {
    adds := []string{"127.0.0.1:7201", "127.0.0.1:8201"}
    config := nsq.NewConfig()
    config.MaxInFlight = 1000
    config.MaxBackoffDuration = 5 * time.Second
    config.DialTimeout = 10 * time.Second

    topicName := "testTopic1"
    c, _ := nsq.NewConsumer(topicName, "ch1", config)
    testHandler := &MyTestHandler{consumer: c}

    c.AddHandler(testHandler)
    if err := c.ConnectToNSQLookupds(adds); err != nil {
        panic(err)
    }
    stats := c.Stats()
    if stats.Connections == 0 {
        panic("stats report 0 connections (should be > 0)")
    }
    stop := make(chan os.Signal)
    signal.Notify(stop, os.Interrupt)
    fmt.Println("server is running....")
    <-stop
}

type MyTestHandler struct {
    consumer *nsq.Consumer
}

func (m MyTestHandler) HandleMessage(message *nsq.Message) error {
    fmt.Println(string(message.Body))
    return nil
}

方法ConnectToNSQLookupds就是用於連線nsqlookupd的,但是需要注意的是,連線的是http72018201,庫go-nsq 是通過請求其中一個nsqlookupd的 http 方法http://127.0.0.1:7201/lookup?topic=testTopic1 來得到所有提供topic=testTopic1nsqd 列表資訊,然後對所有的nsqd進行連線,

2019/08/30 13:47:26 INF    1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1
2019/08/30 13:47:26 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:7000) connecting to nsqd
2019/08/30 13:47:26 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) connecting to nsqd

目前我們已經連線了兩個。
我們演示一下橫向擴充,啟動第三個nsqd

./nsqd -tcp-address ":6000"  -http-address ":6001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200  -data-path=./c

這裡會有一個問題,當我啟動了一個親的nsqd但是他的topic是空的,我們需指定這新的nsqd處理哪些topic。
我們可以用nsqadmin檢視所有的topic

./nsqadmin  --lookupd-http-address=127.0.0.1:8201 --lookupd-http-address=127.0.0.1:7201

然後去你的nsqd上去建topic

curl -X POST 'http://127.0.0.1:6001/topic/create?topic=testTopic1'

當然也可以自己寫一些自動化的角本
檢視客戶端的日誌輸出

2019/08/30 14:56:01 INF    1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1
2019/08/30 14:56:01 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:6000) connecting to nsqd

已經連上我們的新nsqd

我手動關閉一個nsqd例項
客戶端的日誌輸出已經斷開了連線

2019/08/30 15:04:20 ERR    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) IO error - EOF
2019/08/30 15:04:20 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) beginning close
2019/08/30 15:04:20 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) readLoop exiting
2019/08/30 15:04:20 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) breaking out of writeLoop
2019/08/30 15:04:20 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) writeLoop exiting
2019/08/30 15:04:20 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) finished draining, cleanup exiting
2019/08/30 15:04:20 INF    1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) clean close complete
2019/08/30 15:04:20 WRN    1 [testTopic1/ch1] there are 2 connections left alive

並且nsqdnsqlookupd也斷開了連線,客戶端再次從nsqlookupd取所有的nsqd的地址時得到的總是可用的地址。

去中心化實現原理

nsqlookupd用於管理整個網路拓撲結構,nsqd用他實現服務的註冊,客戶端使用他得到所有的nsqd服務節點資訊,然後所有的consumer端連線
實現原理如下,

  • nsqd把自己的服務資訊廣播給一個或者多個nsqlookupd
  • 客戶端連線一個或者多個nsqlookupd,通過nsqlookupd得到所有的nsqd的連線資訊,進行連線消費,
  • 如果某個nsqd出現問題,down機了,會和nsqlookupd斷開,這樣客戶端nsqlookupd得到的nsqd的列表永遠是可用的。客戶端連線的是所有的nsqd,一個出問題了就用其他的連線,所以也不會受影響。