高效能訊息佇列NSQ
前言
最近我再網上尋找使用golang實現的mq,因為我知道golang一般實現的應用部署起來很方便,所以我就找到了一個叫做nsq的mq,其實它並不能完全稱為佇列,但是它的輕量和效能的高效,讓我真的大開眼界。
安裝和部署
官網提供
如果你有能力的話直接閱讀官方的說明進行操作就可以了
https://nsq.io/overview/quick_start.html如果看不懂我還找到了中文翻譯過的:
http://wiki.jikexueyuan.com/project/nsq-guide/
簡單部署
下面是我使用的最快部署測試方式,使用伺服器環境centos7.4,防火牆開放埠4160,4161,4151
4171
1、在下載頁面下載對應版本(可能有的時候需要科學上網)
https://nsq.io/deployment/installing.html
這裡使用linux版本
nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
2、將包上傳至伺服器後解壓;
tar xvf nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
3、進入bin目錄 cd nsq-1.1.0.linux-amd64.go1.10.3/bin
4、後臺啟動三個服務
nohup ./nsqlookupd > /dev/null 2>&1 &
nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &
簡單使用
1、使用
curl -d 'hello world' ' http://127.0.0.1:4151/pub?topic=test '
會建立一個test主題,併發送一個hello world訊息
3、使用
./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
消費test中剛才的訊息,並輸出到伺服器/tmp目錄中
特性
官方給出的文件給出了很多特性的說明,針對於一個MQ來說,我認為下面幾個特性是你必須知道的:
-
預設一開始訊息不是持久化的
nsq採用的方式時記憶體+硬碟的模式,當記憶體到達一定程度時就會將資料持久化到硬碟
1、如果將 --mem-queue-size 設定為 0,所有的訊息將會儲存到磁碟。
2、但是即使伺服器重啟也會將當時在記憶體中的訊息持久化
-
訊息是沒有順序的
這一點很關鍵,由於nsq使用記憶體+磁碟的模式,而且還有requeue的操作,所以傳送訊息的順序和接收的順序可能不一樣
-
官方不推薦使用客戶端發訊息
官方提供相應的客戶端傳送訊息,但是HTTP可能更方便一些
-
沒有複製
nsq節點相對獨立,節點與節點之間沒有複製或者叢集的關係。
-
沒有鑑權相關模組
當前release版本的nsq沒有鑑權模組,只有版本v0.2.29+高於這個的才有
-
幾個小點
topic名稱有長度限制,命名建議用下劃線連線;
訊息體大小有限制;
優缺點
優點:
1、部署極其方便,沒有任何環境依賴,直接啟動就行
2、輕量沒有過多的配置引數,只需要簡單的配置就可以直接使用
3、效能高
4、訊息不存在丟失的情況
缺點:
1、訊息無順序
2、節點之間沒有訊息複製
3、沒有鑑權
多節點部署
基本概念
nsqd:基本的節點
nsqlookupd:彙總節點資訊,提供查詢和管理topic等服務
nsqadmin:管理端展示UI介面,能有一個web頁面去檢視和操作
結構


最簡單的多節點部署可以是這樣的一個結構
部署步驟和命令
PS:後臺啟動使用nohup即可,下面只是為了說明啟動方式和命令引數
第一步需要啟動nsqlookupd
./nsqlookupd
預設佔用4161和4160兩個埠
使用-http-address和-tcp-address可以修改
第二步啟動兩個nsqd
./nsqd -lookupd-tcp-address=192.168.1.102:4160 -broadcast-address=192.168.1.103 -data-path="/temp/nsq"
其中
-lookupd-tcp-address為上面nsqlookupd的IP和tcp的埠4160
-broadcast-address我填寫的是自己的IP,這個IP官網上寫的是會註冊到nsqlookupd
-data-path為訊息持久化的位置
第三步啟動nsqadmin
./nsqadmin -lookupd-http-address=192.168.4.102:4161
同樣需要指定-lookupd-http-address但是這次是http的埠也就是4161因為admin通過http請求來查詢相關資訊
後續擴充套件
上面只是最簡單的兩個節點的部署,如果後續想擴充套件就會如下


其中nginx是可以不需要的,你可以果斷選擇同時向多個節點發送訊息,或者當訊息沒有處理的時候重新進行傳送,因為這樣也是nsq設計之初的考慮。你也可以根據自己的需要設計你自己的架構。
客戶端
官方提供了很多語言接入的客戶端 https://nsq.io/clients/client_libraries.html
針對訊息生產者的客戶端,官方還推薦直接使用post請求傳送訊息,如:
curl -d 'hello world' ' http://127.0.0.1:4151/pub?topic=test '
表示向test主題傳送hello world這個訊息
下面介紹兩種客戶端,一種是golang的客戶端,一種是java的客戶端
Golang的客戶端
其中192.168.4.102:4150為傳送訊息的地址,消費者裡面寫的也是相同的地址就可以了。
生產者:
package main import ( "github.com/nsqio/go-nsq" "time" ) func main() { for i := 0 ; i < 10; i++{ sendMessage() } time.Sleep(time.Second * 10) } func sendMessage() { url := "192.168.4.102:4150" producer, err := nsq.NewProducer(url, nsq.NewConfig()) if err != nil { panic(err) } err = producer.Publish("test", []byte("hello world")) if err != nil { panic(err) } producer.Stop() }
消費者:
package main import ( "fmt" "github.com/nsqio/go-nsq" "sync" ) func main() { testNSQ() } type NSQHandler struct { } func (this *NSQHandler) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) return nil } func testNSQ() { url := "192.168.4.102:4150" waiter := sync.WaitGroup{} waiter.Add(1) go func() { defer waiter.Done() config:=nsq.NewConfig() config.MaxInFlight=9 for i := 0; i<10; i++ { consumer, err := nsq.NewConsumer("test", "struggle", config) if nil != err { fmt.Println("err", err) return } consumer.AddHandler(&NSQHandler{}) err = consumer.ConnectToNSQD(url) if nil != err { fmt.Println("err", err) return } } select{} }() waiter.Wait() }
Java的客戶端
說實話java的客戶端確實用的人比較少,因為我看到實際在github上面的星星和關注就比較少,所以客戶端多多少少都存在一些問題。nsq-j和JavaNSQClient是官方排的考前的客戶端。
這裡說一下nsq-j
https://github.com/sproutsocial/nsq-j生產者
Publisher publisher = new Publisher("192.168.4.102:4150"); System.out.print(publisher); byte[] data = "Hello nsq".getBytes(); publisher.publish("example_topic", data); publisher.publish("example_topic", data); // 注意這裡需要這樣關閉,不然的話就阻塞住了 publisher.getClient().stop();
消費者
public class PubExample { public static void handleData(byte[] data) { System.out.println("Received:" + new String(data)); } public static void main(String[] args) { Subscriber subscriber = new Subscriber("192.168.4.102:4161"); subscriber.subscribe("test", "struggle", PubExample::handleData); } }
需要注意的是其中192.168.4.102:4161這個是nsqlookupd的http地址和埠和生產者是不一樣的
java客戶端是根據nsqlookupd來找到對應消費埠
所以啟動nsqlookupd的時候需要注意,啟動nsqd需要加上引數--broadcast-address
如:./nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=192.168.4.102
這樣java消費者才能找到對應的地址否則會出現
ERROR com.sproutsocial.nsq.Subscription - error connecting to:localhost.localdomain:4150
java.net.UnknownHostException: localhost.localdomain
這樣類似的錯誤
我建議的客戶端
官方也說了,傳送訊息其實不建議使用客戶端,而建議使用http請求,所以我自己是使用okhttp進行訊息的傳送,案例如下:
OkHttpClient client = new OkHttpClient(); MediaType mediaType = MediaType.parse("application/json"); RequestBody body = RequestBody.create(mediaType, "{"code": 1}"); Request request = new Request.Builder() .url("http://192.168.4.102:4151/pub?topic=test") .post(body) .addHeader("Content-Type", "application/json") .build(); Response response = client.newCall(request).execute(); System.out.println(response);
當然這裡沒有對client進行配置,這就涉及okhttp了,這裡不再贅述
至於消費端還是使用nsq-j的
總結
使用下來我們可以看到,nsq為了提供效能在一些方面是做出了妥協的,我們可以總結出下面幾個方面供大家參考:
1、暫時nsq的鑑權功能在高版本才支援,但是高版本沒有release所以建議nsq在內網環境下使用,或者在一些安全的埠使用,避免被攻擊
2、部署節點在3個以上,nsq已經對於訊息丟失做了很多的考慮,基本上不會出現丟失的情況,在你考慮冪等性的情況下,同時部署多個節點有利於訊息進行處理
3、如果對訊息順序有要求的情況下,nsq是不能使用的,因為nsq不能保證訊息的順序
4、節點之間沒有訊息複製,所以即使多個節點部署,萬一節點出現問題,還是有一段時間會出現訊息無法接收到的情況,所以向多個節點同時傳送訊息也是一種解決方式
5、因為nsq拋棄了一些東西,那麼所帶來的自然是方便,整體使用下來主要感受就是輕量,部署和配置都很方便,而且對於節點的監控能有介面
希望後續nsq能在幾個版本更新之後能給我們帶來更加牛逼的表現。