1. 程式人生 > >從 Modbus 到 Web 資料視覺化之 WebSocket 實時訊息

從 Modbus 到 Web 資料視覺化之 WebSocket 實時訊息

前言

  工業物聯網是一個範圍很大的概念,本文從資料視覺化的角度介紹了一個最小化的工業物聯網平臺,從 Modbus 資料採集到前端資料視覺化呈現的基本實現思路。這裡面主要涉及基於 Modbus 通訊規約的資料採集、後臺實時資料處理、前端實時資料接收、前端實時資料視覺化顯示。物聯網平臺架構主要參考了圖撲物聯工業物聯網平臺,並從中提取了部分功能進行介紹,前端資料視覺化採用的是

  由於內容比較多,具體實現上涉及到前端工程師、後臺工程師、資料採集工程師等多個開發角色的參與,所以本文重點介紹實現思路和 WebSocket 訊息推送的實現,其它環節的具體實現細節作者會在其它文章中進行詳細介紹。

一、物聯網平臺架構

  物聯網平臺主要是B/S模式,工業物聯網平臺大都採用的是微服務架構,本文主要涉及兩個微服務:前置資料採集服務和 Web 實時訊息推送服務。

  前置資料採集服務主要用於現場裝置、儀器、儀表、感測器實時資料的採集,IoTopo工業物聯網平臺支援MQTT和透傳雲解析兩種方式,透傳雲解析支援 Modbus 通訊規約。

  實時資料採集到平臺後,需要推送到瀏覽器端進行顯示,Web 實時訊息推送服務採用 Web Socket 進行實時資料推送,可以確保資料的實時性和高效性。

  前端視覺化技術採用的是HT for Web, HT for Web 是基於HTML5標準的企業應用圖形介面一站式解決方案,其包含通用元件、拓撲元件和3D渲染引擎等豐富的圖形介面開發類庫。雖然 HT for Web 是商業軟體但其提供的一站式解決方案可以極大縮短產品開發週期、減少研發成本、補齊我們在 Web 圖形介面視覺化技術上的短板。

二、Modbus 資料採集

Modbus是一種序列通訊協議,是Modicon公司(現在的施耐德電氣Schneider Electric)於1979年為使用可程式設計邏輯控制器(PLC)通訊而發表。Modbus已經成為工業領域通訊協議的業界標準,並且現在是工業電子裝置之間常用的連線方式。Modbus比其他通訊協議使用的更廣泛的主要原因有:

  1. 公開發表並且無版權要求
  2. 易於部署和維護
  3. 對供應商來說,修改移動本地的位元或位元組沒有很多限制

Modbus允許多個 (大約240個) 裝置連線在同一個網路上進行通訊,舉個例子,一個由測量溫度和溼度的裝置,並且將結果傳送給計算機。在資料採集與監視控制系統(SCADA)中,Modbus通常用來連線監控計算機和遠端終端控制系統(RTU)。

目前主流的編輯語言都有 Modbus 開發庫,由於 Modbus 相對比較簡單,很多企業也選擇自行開發實現。Modbus 資料採集屬於後臺通訊,資料採集到平臺後首先會進行資料清理和預處理,過濾掉冗餘和無效資料,形成實時資料。平臺獲取到實時資料後一般會做 3 項工作:

1. 推送到 Web 前端進行顯示

2. 儲存到時序資料庫

3. 判斷是否產生告警

 

三、將實時資料推送到 Web 前端

  基於 Web 的實時資料推送需要用到 WebSocket,初學者可以學習阮一峰老師的 WebSocket 教程。我們基於 WebSocket 封裝了一套訊息傳輸協議,類似於一個訊息中介軟體,前端部分可以訂閱實時資料。考慮到海量實時資料的推送需求,將實時資料分為平臺級、站點級、裝置級,前端在訂閱實時資料時,可以通過訊息主題規則訂閱不同級別的資料。平臺側在收到訂閱請求時,可以主動推送一次實時資料。這樣可以確保資料視覺化介面在訂閱實時資料成功後,第一時間顯示出正確的介面。

  下面給出一個簡化的 WebSocket 訊息協議的客戶端程式碼,大家可以在些基礎上進行改造以適合自己的業務場景。

  訊息主題正則表示式,用來匹配訊息主題:

1 const matchWildcard = function(str, rule) {
2     return new RegExp('^' + rule.split('*').join('.*') + '$').test(str)
3 }

  WebSocket 客戶端,支援訊息主題訂閱、取消訊息主題訂閱、同一個訊息主題支援多個訂閱者:

  1 class WebSocketClient {
  2     constructor() {
  3         this.ws = null
  4         this.opts = {
  5             debug: false,
  6             autoReconnect: true,
  7             reconnectInterval: 10000,
  8             subscriber: {},
  9         }
 10         this.opened = false
 11     }
 12 
 13     connect() {
 14         if (!this.opened) {
 15             return
 16         }
 17 
 18         const url = 'ws://www.iotopo.com/msg/v1'
 19         console.debug('websocket connect', url)
 20 
 21         let ws = this.ws = new WebSocket(url)
 22         ws.onmessage = event => {
 23             if (this.opts.debug) {
 24                 console.log(event)
 25             }
 26             let data = JSON.parse(event.data)
 27 
 28             for (let topic in this.opts.subscriber) {
 29                 if (matchWildcard(data.topic, topic)) {
 30                     let listeners = this.opts.subscriber[topic]
 31                     if (Array.isArray(listeners)) {
 32                         listeners.forEach(cb => {
 33                             if (typeof cb === 'function') {
 34                                 cb(data.payload)
 35                             }
 36                         })
 37                     }
 38                 }
 39             }
 40         }
 41         ws.onopen = e => {
 42             if (this.opts.debug) {
 43                 console.log(e)
 44             }
 45             // 執行訂閱請求
 46             for (let topic in this.opts.subscriber) {
 47                 this._sendSubscribe(topic)
 48             }
 49             if (typeof this.opts.onopen === 'function') {
 50                 this.opts.onopen(e)
 51             }
 52         }
 53         ws.onclose = e => {
 54             if (this.opts.debug) {
 55                 console.log(e)
 56             }
 57             if (typeof this.opts.onclose === 'function') {
 58                 this.opts.onclose(e)
 59             }
 60             if (this.opened && this.opts.autoReconnect) {
 61                 setTimeout(() => {
 62                     this.connect()
 63                 }, this.opts.reconnectInterval)
 64             }
 65         }
 66         ws.onerror = e => {
 67             if (this.opts.debug) {
 68                 console.log(e)
 69             }
 70             if (typeof this.opts.onerror === 'function') {
 71                 this.opts.onerror(e)
 72             }
 73         }
 74     }
 75 
 76     open(opts) {
 77         if (!this.opened) {
 78             Object.assign(this.opts, opts || {})
 79             this.opened = true
 80             this.connect()
 81         }
 82     }
 83 
 84     close() {
 85         this.opened = false
 86         if (this.ws !== null) {
 87             this.ws.close()
 88         }
 89         this.ws = null
 90     }
 91 
 92     isOpened() {
 93         return this.opened
 94     }
 95 
 96     isConnected() {
 97         return this.ws !== null
 98     }
 99 
100     _sendSubscribe(topic) {
101         if (this.ws === null) {
102             return Error('websocet not opened')
103         }
104         if (typeof topic !== 'string') {
105             return Error('topic should be a string value')
106         }
107 
108         if (this.ws.readyState === WebSocket.OPEN) {
109             let msg = {
110                 type: 'subscribe',
111                 topic: topic,
112             }
113             this.ws.send(JSON.stringify(msg))
114         } else {
115             return Error('websocet not connected')
116         }
117     }
118 
119     subscribe(topic, cb) {
120         if (this.opts.debug) {
121             console.log('subscribe:', topic)
122         }
123         let listeners = this.opts.subscriber[topic]
124         if (!Array.isArray(listeners)) {
125             listeners = [
126                 cb
127             ]
128             this.opts.subscriber[topic] = listeners
129         } else {
130             listeners.push(cb)
131         }
132         this._sendSubscribe(topic)
133 
134         return { topic, cb }
135     }
136 
137     unsubscribe({topic, cb}) {
138         if (this.opts.debug) {
139             console.log('unsubscribe:', topic)
140         }
141 
142         if (this.ws === null) {
143             return Error('websocet not opened')
144         }
145 
146         if (typeof topic !== 'string') {
147             return Error('topic should be a string value')
148         }
149 
150         let listeners = this.opts.subscriber[topic]
151         if (cb) {
152             if (Array.isArray(listeners)) {
153                 let idx = listeners.indexOf(cb)
154                 if (idx >= 0) {
155                     listeners.splice(idx, 1)
156                 }
157             }
158         } else {
159             delete this.opts.subscriber[topic]
160         }
161 
162         if (Array.isArray(listeners) && listeners == 0) {
163             if (this.ws.readyState === WebSocket.OPEN) {
164                 let msg = {
165                     type: 'unsubscribe',
166                     topic: topic,
167                 }
168                 this.ws.send(JSON.stringify(msg))
169             } else {
170                 return Error('websocet not connected')
171             }
172         }
173     }
174 }

  用法舉例:

 1 // 初始化客戶端
 2 const ws = new WebSocketClient()
 3 // 與 WebSocket 伺服器建議連線
 4 ws.open({
 5     debug: false
 6 })
 7 // 訂閱訊息
 8 ws.subscribe('/foo/bar/*', function(msg) {
 9     console.log('recv ws msg:', msg)
10 })

四、資料視覺化介面實現

  基於 HT for Web 可以簡單快速地搭建一個符合 HTML5 標準的視覺化圖形介面,通過 WebSocket 訂閱實時資料,然後驅動圖形介面的變化。資料驅動圖形介面變化的實現方式很多,基本方法是採用資料繫結的方式,具體可以參考 HT for Web 的官方文件

在後面的文章中,作者會介紹一種基於 HT for Web 實現的業務資料和圖形資料分離的資料繫結方法。