1. 程式人生 > >RabbitMQ 入門 (Go) - 3. 模擬感測器,生成資料併發布

RabbitMQ 入門 (Go) - 3. 模擬感測器,生成資料併發布

現在,我們需要模擬感測器,生成資料,併發布到 RabbitMQ。

建立感測器專案

在 GOPATH src 下建立資料夾 sensors,使用 go mod init 初始化,並建立 main.go。

同時別忘了安裝 amqp 的包:go get -u github.com/streadway/amqp

 

我們要生成一些模擬資料,生成資料有一定的範圍(位於一個最大值和最小值之間),如下圖:

 

因此,我們需要這樣幾個配置引數:

  1. 感測器的名稱

  2. 感測器資料的更新頻率

  3. 模擬生成資料的最大值

  4. 模擬生成資料的最小值

  5. 與前一次生成資料的差值的最大值(變化幅度的最大值)

 

設定命令列引數並讀取

在這個專案裡,我們需要通過命令列引數來傳遞配置,並在 Go 程式裡面進行解析和讀取。我們可以使用 os.Args 來搞這些命令列引數,但是更好的辦法是使用 flag 這個包(其內部實現使用的也是 os.Args)。

 

我們先看程式碼:

  1. 第 5-9 行,我們聲明瞭 5 個命令列引數。都是使用 flag 包下相應的函式實現的。

    1. 這幾個命令列引數分別表示感測器名稱、模擬資料的更新頻率、模擬資料的最大值、最小值以及變化幅度的最大值。

    2. 這些命令列引數的型別分別是 string,uint,float64,float64,float64。

    3. 這些函式的引數都類似:

      1. 第一個引數是命令列引數的名稱

      2. 第二個引數是命令列引數的預設值

      3. 第三個引數是引數的描述/幫助

  2. 在 main 函式裡,我們呼叫     flat.Parse() 函式,就可以將命令列的引數值解析到 5-9 行宣告變數裡面。

 

我們測試一下,命令列輸入 go run . --help,其結果如下:

 

生成模擬資料

要生成模擬感測器的資料,需要使用到 math/rand 和 time 這兩個包。

先看程式碼:

  1. 第 17 行,我們需要一個 *rand.Rand 型別來生成隨機數,它又需要一個源,這裡使用 time.Now().UnixNano() 生成源,這樣做的好處是因為這個時間納秒數永遠不會重複。

  2. 第 19 行,宣告 value,它表示感測器的數值,在這先生成一個初始值。

  3. 第 20 行,是額定值,在這裡也就是最大值最小值的中間平均值。

  4. 第 25 行,把更新頻率(每秒更新的次數)轉化為了兩次更新之間的時間間隔(毫秒),並解析成 time.Duration 這個型別。

  5. 第 26 行,time.Tick 函式會返回一個 time 的 Channel,該函式會按照提供的時間間隔不斷觸發,並向這個 Channel 傳送當前時間。

  6. 第 28 行,使用 for range 來處理 signal 這個 Channel,每次 Channel 中有資料傳遞過來,我們就使用 calcValue 這個函式來生成新的模擬資料。

  7. 第 29 行,把生成的最新資料列印一下即可。

 

calcValue 函式

生成模擬資料的邏輯是如果資料偏離額定值,那麼儘量讓下次生成的值向額定值靠攏。

這部分可根據自己的特定需求來實現,不必和我的相同。

先看程式碼:

  1. 第 35 行,聲明瞭 maxStep 和 minStep 兩個變數,表示本次更新相比上次所能夠發生的最大變化和最小變化幅度。

  2. 第 36 - 42     行,區分當前值大於額定值或小於額定值兩種情況,按不同的邏輯得出 maxStep 和 minStep

  3. 第 44 行,使用 maxStep     和 minStep 以及隨機數生成新的 value 資料。

 

執行 sensors 專案

使用 go run . 執行,命令列引數使用預設值即可:

一切正常的話,它就會每秒鐘生成 5 次資料。

 

如何執行多個感測器

生產環境中,通常會接收來自多個感測器的資料。

這裡,我們讓每個感測器都設定自己的路由 Key,所以 RabbitMQ 將會為每個 Key 建立一個 Queue:

但是這也會引起問題,就是之前章節裡面的那個協調程式如何發現這些感測器呢?

首先,我們可以讓每個感測器使用路由 Key 向一個所有感測器和協調程式都知曉的路徑中傳送一個訊息。但這隻能解決問題的一半,另一半我們以後再說。

 

將感測器資料釋出到 RabbitMQ

建立感測器的訊息型別

這裡會使用到 encoding/gob 包。

看程式碼:

  • 在 sensors 包中建立 model 包,並建立 models.go 檔案。

  • 在 models.go 的第 12 行,建立 SensorMessage 作為感測器傳遞訊息的型別,裡面包含三個欄位分別是感測器名稱、數值和時間戳。

  • 很顯然我們不能把 Go 的 struct 型別直接扔到 RabbitMQ 裡面,但我們專案中的各種客戶端只涉及到 Go 語言,所以在這裡我使用 Go 語言的 gob 來對訊息進行編碼,這樣會更高效一些。如果這個專案是跨語言的我可能會使用 JSON 或 Protocol Buffers。

  • 在 model 包的 init 函式裡面,需要使用 gob 包的 Register 函式把將要編碼的型別進行註冊,這樣依賴於這個包的其它 Go 程式就可以把     SensorMessage 這個型別的訊息物件傳送過去了

 

建立 Queue 相關的工具包

建立 tools 包,並建立 queuetools.go 檔案,其內容如下:

程式碼內容與之前的專案類似,就不解釋了。

 

釋出感測器資料到 RabbitMQ

這裡還會使用到 bytes 包。

回到 main.go,修改程式碼:

  1. 前面添加了獲取 Channel 和 Queue 的程式碼。其中第 37 行比較重要,因為我們不能保證在程式執行時,使用 Queue 名稱作為路由 Key 的 Queue 存在,而使用 GetQueue 函式,就可以保證這個 Queue 會被正確的設定,並準備好被我們使用了。

  2. 第 42 行,使用 bytes 包建立了一個 *bytes.Buffer,它用來來承載編碼後的資料,這個 Buffer 可以重複利用,所以實在 for range 的外部宣告的。

    1. 但是每次使用 Buffer 都需要進行重置,也就是第 53 行的作用,這樣以前的資料就會被移除,Buffer 的指標會回到初始位置。

  3. 第 43 行,使用 gob 和 Buffer 來建立編碼器 。

  4. 第 54 行,使用 編碼器的 Encode 方法對訊息進行編碼。

  5. 第 56 行,建立要傳送給 RabbitMQ 的訊息(amqp.Publishing 型別),這裡只需要填寫 Body 欄位即可,其它的欄位根據自己的需求選填即可。

  6. 第 60 行,使用 Channel 來發布訊息,這裡使用的是預設的 Exchange,路由 key 就是 Queue 的名字,最後一個引數就是釋出的訊息。

 

執行程式

執行 sensors 包:

 

開啟控制檯:

可以看到傳送頻率確實是每秒 5 次。

 

開啟 sensor Queue:

目前已經有 384 條訊息了,都沒有被髮送。

 

隨便點開一個訊息檢視其內容:

可以看到 Body 應該是 Base64 編碼的。因為 gob 編碼器使用的是二進位制訊息格式,儘可能的高效,所以在控制檯裡面它沒有一個有意義的表述展示。

 

然後,先停止執行程式。

 

感測器上線時通知協調程式

最後我們就來處理上面那個問題:當感測器上線的時候,得讓協調程式知道,併發送資料。

因為每個感測器都建立了一個自己的 Queue,所以在沒有幫助的情況下,協調程式將無法有效知道這些感測器。

這個問題實際上具體需要做兩件事,我們先來做第一件事:

多個感測器他們 Queue 的名稱是不一樣的,是動態的,所以我們需要一個大家都知道的 Queue,它用來將每個新建立的感測器的 Queue 名稱傳送給協調程式。

 

首先,在 queuetools.go 裡面新增這個 Queue 的名稱,使用一個常量儲存:

 

然後,在 main.go 裡,使用這個名稱建立一個 Queue,並將感測器的 Queue 的名稱釋出上去:

 

再次執行 sensor 包

開啟控制檯:

可以看到 SensorList Queue 出現了。

 

進入到 SensorList Queue,看它的 Message:

 

可以看到當前這一個感測器的名字 sensor 就在裡面。

  &nbs