1. 程式人生 > >開源MQTT中介軟體:moquette

開源MQTT中介軟體:moquette

經常會在專案中用到訊息傳遞,在不同的場景下,訊息傳遞的要求是不一樣的。java世界中,jms的規範可遵循,同時也有開源的相關軟體來支援。 本文來說說一下mqtt,以及moquette。在選擇mqtt的中介軟體時較為糾結,對於非大眾化的開源框架的使用沒有底氣。好在有原始碼,研究原始碼,經過大量測試,效果還可以,推薦給大家。

溝通交流群:

image

經測試過程發現moquette存在一些問題,已修改,可能是認識的問題,也可能是出發點不一樣。

總之,修改如下:
1. 修改訊息佇列長度為32,避免了原來訊息佇列超過最大條數之後,publish出錯的情況 修改了storage的建構函式,使其更通用
2. 修改了每次都對clientId的判定,針對client首次連結的情況 修改了離線訊息簽收時的空指標異常 棄用了一些不常用的模組
3. 添加了redis儲存實現 redis採用了現有conf配置機制 重新設計了session的儲存結構,以便後續新增分片處理
4. 修改工程的結構,獨立了common模組,同時將redis,mapdb,broker建在common基礎上
5. 針對publish的記憶體洩漏,進行了修改(原本以為是netty洩露),經過兩天的不眠不休的除錯,發現是moquette未回收導致

使用說明文件:

1.簡介

Moquette是一款開源的訊息代理,整個系統基於java開發,以netty為基礎完整實現了MQTT協議的。
基於測試,moquette的客戶端承載量及訊息的推送速度都比較客觀,在大批量頻繁短線上線的情景下,也可以承受。
Moquette程式碼是完全開源的,測試過程中的問題進行了一定的修改,擴充套件實現了基於redis儲存的機制。

2.使用

2.1配置檔案

Moquette所使用的配置檔案位於其根目錄下的config裡,包括以下:

  1. acl.conf 許可權配置
  2. hazelcast.xml 叢集配置
  3. password_file.conf 使用者密碼配置
  4. moquette.conf 主配置

下面將詳細講解各配置檔案

許可權配置

基於檔案的許可權配置較為複雜,以下為示例格式,將針對該示例具體說明。

user admin
topic write mqtt/log
pattern write mqtt/log/+
topic read mqtt/lost
user client
topic read mqtt/log
pattern read mqtt/log/%c
topic write mqtt/lost
  1. [user admin] 指示一個使用者admin。其後的條目,代表該使用者的相關topic的讀寫許可權,一直到另一個user結束。

  2. [topic write wifi/log] 代表隊wifi/log主題具有write許可權,topic命令指定特定的主題名稱,不能帶有萬用字元。

  3. [pattern write wifi/log/+] 使用萬用字元指示符合規則的一定數量的topic的許可權。
    許可權分類:


    • write
    • read
    • writeread

叢集配置

Moquette的叢集配置實用的是hazelcast。Hazelcast是基於java編寫的資料同步工具。在moquette中,用於不同節點訊息的同步。

<network>
<public-address>IP1:5701</public-address>
<port>5701</port>
<join>
       <multicast enabled="false" />
       <tcp-ip enabled="true">
              <required-member>IP2:5701</required-member>
       </tcp-ip>
</join>
</network>

public-address:代表了當前節點的IP及埠
required-member:代表了叢集中的其他節點。
各節點的叢集模式建立後,各節點是對等關係,無主從之分

使用者管理

該檔案用於系統的可登入使用者,例項格式如下:

#*********************************************
# Each line define a user login in the format
#   <username>:sha256(<password>)
#*********************************************
#NB this password is sha256(passwd)
admin:8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92
client:8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92

該檔案的格式非常簡單:
每行代表了一個使用者及其密碼,用:分割,密碼是sha256摘要後的結果。

關於client(消費者)所使用的使用者,大部分情況下,client只需要其clientId來區分,因此後臺可針對業務型別建立不同的使用者分給client使用,不需要為每個clientId都建立使用者。

主配置

主配置中包含了較多內容,介紹如下:

1.埠

port 1883
websocket_port 8383

port 1883 是broker的主埠,預設為MQTT協議的1883埠
由於系統提供了websocket功能,可以使用websocket的方式使用(該模式未進行測試)。

2. SSL埠及配置

# ssl_port 8883
#jks_path serverkeystore.jks
#key_store_password passw0rdsrv
#key_manager_password passw0rdsrv

對於有較高安全要求的系統,可以新增SSL支援。

3.IP繫結限制

#*********************************************************************
# The interface to bind the server
#  0.0.0.0 means "any"
#*********************************************************************
host 0.0.0.0

4.儲存設定

storage_class io.moquette.persistence.redis.RedisStorageService
由於基於不同儲存的實現的效能,差異性較大,moquette預設採用記憶體儲存的模式,該模式有很高的效能,但存在單點崩潰下,訊息丟失的風險(由於叢集負載的使用,可降低該問題發生的影響範圍)。
如果對儲存過於看重,效能可求次,可使用基於redis的儲存實現,其自帶的mapdb的儲存實現,錯誤較多。
在不設定的情況,預設採用的基於memory的儲存實現。

5.啟用許可權訪問

#*********************************************************************
# acl_file:
#    defines the path to the ACL file relative to moquette home dir
#    contained in the moquette.path system property
#*********************************************************************
acl_file config/acl.conf

以上代表了,broker將以acl.conf中的內容為基礎進行授權鑑權。

6.是否允許匿名訪問

#*********************************************************************
# allow_anonymous is used to accept MQTT connections also from not
# authenticated clients.
#   - false to accept ONLY client connetions with credentials.
#   - true to accept client connection without credentails, validating
#       only against the password_file, the ones that provides.
#*********************************************************************
allow_anonymous false

以上代表不允許匿名訪問,必須使用使用者名稱及密碼才可以訪問。

7.使用者密碼檔案配置

#*********************************************************************
# password_file:
#    defines the path to the file that contains the credentials for
#    authenticated client connection. It's relative to moquette home dir
#    defined by the system property moquette.path
#*********************************************************************
password_file config/password_file.conf

以上代表broker將使用password檔案進行鑑權,若不需要則可以將其註釋掉。

8.epoll的啟用

#*********************************************************************
# Netty Configuration
#*********************************************************************
#
# Linux systems can use epoll instead of nio. To get a performance
# gain and reduced GC.
# http://netty.io/wiki/native-transports.html for more information
#
netty.epoll true

在linux系統下,提供的epoll機制,可使系統能夠承載更高的終端。以上代表啟用epoll。在機器硬體較好的情況下,epoll模式提升明顯。

9.叢集配置

#hazelcast
#intercept.handler io.moquette.interception.HazelcastInterceptHandler

叢集配置的情況下,需要開啟以上配置,開啟配置的前提是hazelcast.xml檔案已配置。

10.Redis配置

#redis storage
redis.host localhost
redis.port 6379
redis.password
redis.database 0
redis.prefix monitor:

在store_class已經配置為redis的情況下,需要配置以上引數,由於叢集模式使用hazelcast,目前的基於redis的實現,不具備分片等功能,但鍵值的設計已經具備。

2.2 啟動

Moquette程式碼工程採用maven管理,採用maven install可以打包一個在linux下執行的檔案,打包後的格式如下:

  • Lib目錄是所有使用到的lib檔案,分為:
    這裡寫圖片描述

    1. Netty相關
    2. Hazelcast
    3. Log相關
    4. Redis儲存實現引入的lib:在使用memory的模式下,redis相關可以刪除,減少包的大小。
  • Bin目錄:
    這裡寫圖片描述

  • linux下的moquette.sh啟動方式:
    預設不是以後臺執行的方式,需要使用以下命令執行:

linux下的moquette.sh啟動方式:
預設不是以後臺執行的方式,需要使用以下命令執行:

 setsid  ./moquette.sh &

nohup命令模式會找不到配置好的log輸出。
Windows下以bat命令執行

2.3 客戶端

實現moqtt協議的客戶端存在很多種,針對該broker,目前測試使用的是eclipse-phao的,該客戶端實現提供了多種語言版本,便於不同終端使用。

針對不同的語言版本,可提供的功能存在不同,目前broker預設沒有實現除mqtt協議規範中提到的功能。

重連機制,需要客戶端想法實現該機制,避免客戶端掉線後只能重啟才能連結的境界。

image

3. 測試

該broker經過了多次測試。
測試場景:

1.機器配置

記憶體 CPU
8G 4核

Client所執行機器多樣,每臺機器執行5000個client。
Publish為普通windows機器,兩個publisher,每個5個傳送執行緒,平均每秒100條訊息。

2.訊息傳送速度

  • 10秒一條群發的情況下,測試3論。
  • 每秒100條點對點的訊息情況下,測試3輪。
  • 每輪測試20到30幾個小時。

3.客戶端情況

Client在clean Session的情況下,broker的記憶體佔用較低,僅400M左右
在不清理會話的情況下,記憶體佔用較高,在client大批量反覆掉線重連情況下記憶體佔用達到2G
心跳設定60s,過短(低於30s)的心跳,對broker來說不能承受。

4叢集情況

搭建了兩個節點的叢集,通過nginx進行tcp負載,客戶端測試數量為3萬。

3.1承載量測試

1.單broker從8000,15000,18000,25000幾個級別的測試,在不傳送訊息的情況下,這幾個級別的客戶端都可以連線。

承載量 結果
8000 OK
15000 OK
18000 OK
25000 OK

2.在傳送訊息的情況

承載量 結果
8000 OK
15000 OK
18000 NG
25000 NG

在每10秒一條群發訊息的情況下,單點broker的無掉線承載量是15000,18000發生較多的掉線情況。

3.2訊息接收速度

在10秒一條群發訊息及每秒100條點對點訊息的傳送情況下,訊息的接受速度都在1秒以內。

3.3所佔記憶體

基於實現分析,記憶體的佔用主要由client在不清理會話進行連結掉線後產生的訊息積累,在原有基於記憶體的實現機制中,為每個client儲存1024條訊息,在超過1024條後,訊息會導致publish端出錯。修改後的實現,為每個離線client儲存最新的32條訊息,超過32條的將被丟棄。

基於redis的實現,訊息目前沒有設定棄用或過期機制。

測試期間的記憶體分析:
IP1的broker,總記憶體佔用情況如下
image
image

記憶體穩定在800M左右,處於穩定狀態。

3.4注意問題

基於記憶體的儲存實現,目前僅儲存32條離線訊息,超過32條將丟棄原有的。

遺願訊息內容必須為ascii,不能為其他字元。遺願訊息主要用於客戶端掉線後的處理。

客戶端的心跳不能設定過小,否則broker的承載量將嚴重下降,建議60s以上。

遺留的問題:

  • 在心跳之間的時間段,測試發現存在broker誤簽收的情況。
  • 以上問題,在業務實際使用過程中,採取業務簽收等方式,避免訊息質量的不可靠性的出現。