1. 程式人生 > >SpringBoot 整合MQTT配置

SpringBoot 整合MQTT配置

[TOC] ## 1. 前言 公司的[IOT平臺](https://www.cnblogs.com/itdragon/)主要採用MQTT(訊息佇列遙測傳輸)對底層的驅動做命令下發和資料採集。也用到了redis、zeroMQ、nats等訊息中介軟體。今天先整理[SpringBoot整合MQTT](https://www.cnblogs.com/itdragon/)筆記和工作中遇到的問題。 ## 2. MQTT介紹 > *MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.* 官網地址:
MQTT除了具備大部分訊息中介軟體擁有的功能外,其最大的特點就是小型傳輸。以減少開銷,減低網路流量的方式去滿足低頻寬、不穩定的網路遠端傳輸。 MQTT伺服器有很多,比如Apache-Apollo和EMQX,[ITDragon龍](https://www.cnblogs.com/itdragon/) 目前使用的時EMQX作為MQTT的伺服器。使用也很簡單,下載解壓後,進入bin目錄執行emqx console 啟動服務。 MQTT除錯工具可以用MQTTBox ## 3. SpringBoot 整合MQTT ### 3.1 匯入mqtt庫 第一步:匯入面向企業應用整合庫和對應mqtt整合庫 ```groovy compile('org.springframework.boot:spring-boot-starter-integration') compile('org.springframework.integration:spring-integration-mqtt') ``` 這裡要注意spring-integration-mqtt的版本。因為會存在org.eclipse.paho.client.mqttv3修復了一些bug,並迭代了新版本。但spring-integration-mqtt並沒有及時更新的情況。修改方法如下 ```groovy compile("org.springframework.integration:spring-integration-mqtt") { exclude group: "org.eclipse.paho" , module: "org.eclipse.paho.client.mqttv3" } compile("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.2") ``` 第二步:MQTT連線配置檔案 ```properties # MQTT Config mqtt.server=tcp://x.x.x.x:1883 mqtt.username=xxx mqtt.password=xxx mqtt.client-id=clientID mqtt.cache-number=100 mqtt.message.topic=itDragon/tags/cov ``` ### 3.2 配置MQTT訂閱者 [Inbound 入站訊息介面卡](https://www.cnblogs.com/itdragon/) 第一步:配置MQTT客戶端工廠類DefaultMqttPahoClientFactory 第二步:配置MQTT入站訊息介面卡MqttPahoMessageDrivenChannelAdapter 第三步:定義MQTT入站訊息通道MessageChannel 第四步:宣告MQTT入站訊息處理器MessageHandler **以下有些配置是衝突或者重複的,主要是體現一些重要配置。** ```kotlin package com.itdragon.server.config import com.itdragon.server.message.ITDragonMQTTMessageHandler import org.eclipse.paho.client.mqttv3.MqttConnectOptions import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.integration.annotation.ServiceActivator import org.springframework.integration.channel.DirectChannel import org.springframework.integration.core.MessageProducer import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory import org.springframework.integration.mqtt.core.MqttPahoClientFactory import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter import org.springframework.messaging.MessageChannel import org.springframework.messaging.MessageHandler import java.time.Instant @Configuration class MQTTConfig { @Value("\${mqtt.server}") lateinit var mqttServer: String @Value("\${mqtt.user-name}") lateinit var mqttUserName: String @Value("\${mqtt.password}") lateinit var mqttUserPassword: String @Value("\${mqtt.client-id}") lateinit var clientID: String @Value("\${mqtt.cache-number}") lateinit var maxMessageInFlight: String @Value("\${mqtt.message.topic}") lateinit var messageTopic: String /** * 配置DefaultMqttPahoClientFactory * 1. 配置基本的連結資訊 * 2. 配置maxInflight,在mqtt訊息量比較大的情況下將值設大 */ fun mqttClientFactory(): MqttPahoClientFactory { val mqttConnectOptions = MqttConnectOptions() // 配置mqtt服務端地址,登入賬號和密碼 mqttConnectOptions.serverURIs = arrayOf(mqttServer) mqttConnectOptions.userName = mqttUserName mqttConnectOptions.password = mqttUserPassword.toCharArray() // 配置最大不確定接收訊息數量,預設值10,qos!=0 時生效 mqttConnectOptions.maxInflight = maxMessageInFlight.toInt() val factory = DefaultMqttPahoClientFactory() factory.connectionOptions = mqttConnectOptions return factory } /** * 配置Inbound入站,消費者基本連線配置 * 1. 通過DefaultMqttPahoClientFactory 初始化入站通道介面卡 * 2. 配置超時時長,預設30000毫秒 * 3. 配置Paho訊息轉換器 * 4. 配置傳送資料的服務質量 0~2 * 5. 配置訂閱通道 */ @Bean fun itDragonMqttInbound(): MessageProducer { // 初始化入站通道介面卡,使用的是Eclipse Paho MQTT客戶端庫 val adapter = MqttPahoMessageDrivenChannelAdapter(clientID + Instant.now().toEpochMilli(), mqttClientFactory(), messageTopic) // 設定連線超時時長(預設30000毫秒) adapter.setCompletionTimeout(30000) // 配置預設Paho訊息轉換器(qos=0, retain=false, charset=UTF-8) adapter.setConverter(DefaultPahoMessageConverter()) // 設定服務質量 // 0 最多一次,資料可能丟失; // 1 至少一次,資料可能重複; // 2 只有一次,有且只有一次;最耗效能 adapter.setQos(0) // 設定訂閱通道 adapter.outputChannel = itDragonMqttInputChannel() return adapter } /** * 配置Inbound入站,消費者訂閱的訊息通道 */ @Bean fun itDragonMqttInputChannel(): MessageChannel { return DirectChannel() } /** * 配置Inbound入站,消費者的訊息處理器 * 1. 使用@ServiceActivator註解,表明所修飾的方法用於訊息處理 * 2. 使用inputChannel值,表明從指定通道中取值 * 3. 利用函數語言程式設計的思路,解耦MessageHandler的業務邏輯 */ @Bean @ServiceActivator(inputChannel = "itDragonMqttInputChannel") fun commandDataHandler(): MessageHandler { /*return MessageHandler { message ->
println(message.payload) }*/ return ITDragonMQTTMessageHandler() } } ``` 注意: - 1)MQTT的客戶端ID要唯一。 - 2)MQTT在訊息量大的情況下會出現訊息丟失的情況。 - 3)MessageHandler注意解耦問題。 ### 3.3 配置MQTT釋出者 [Outbound 出站訊息介面卡](https://www.cnblogs.com/itdragon/) 第一步:配置Outbound出站,出站通道介面卡 第二步:配置Outbound出站,釋出者傳送的訊息通道 第三步:對外提供推送訊息的介面 在原有的MQTTConfig配置類的整合上補充以下內容 ```kotlin /** * 配置Outbound出站,出站通道介面卡 * 1. 通過MqttPahoMessageHandler 初始化出站通道介面卡 * 2. 配置非同步傳送 * 3. 配置預設的服務質量 */ @Bean @ServiceActivator(inputChannel = "itDragonMqttOutputChannel") fun itDragonMqttOutbound(): MqttPahoMessageHandler { // 初始化出站通道介面卡,使用的是Eclipse Paho MQTT客戶端庫 val messageHandler = MqttPahoMessageHandler(clientID + Instant.now().toEpochMilli() + "_set", mqttClientFactory()) // 設定非同步傳送,預設是false(傳送時阻塞) messageHandler.setAsync(true) // 設定預設的服務質量 messageHandler.setDefaultQos(0) return messageHandler } /** * 配置Outbound出站,釋出者傳送的訊息通道 */ @Bean fun itDragonMqttOutputChannel(): MessageChannel { return DirectChannel() } /** * 對外提供推送訊息的介面 * 1. 使用@MessagingGateway註解,配置MQTTMessageGateway訊息推送介面 * 2. 使用defaultRequestChannel值,呼叫時將向其傳送訊息的預設通道 * 3. 配置靈活的topic主題 */ @MessagingGateway(defaultRequestChannel = "itDragonMqttOutputChannel") interface MQTTMessageGateway { fun sendToMqtt(data: String, @Header(MqttHeaders.TOPIC) topic: String) fun sendToMqtt(data: String, @Header(MqttHeaders.QOS) qos: Int, @Header(MqttHeaders.TOPIC) topic: String) } ``` 注意: * 1)釋出者和訂閱者的客戶端ID不能相同。 * 2)訊息的推送建議採用非同步的方式。 * 3)訊息的推送方法可以只傳payload訊息體,但需要配置setDefaultTopic。 ### 3.4 MQTT訊息處理和傳送 #### 3.4.1 訊息處理 為了讓訊息處理函式和MQTT配置解耦,這裡提供MessageHandler 註冊類,將訊息處理的業務邏輯以函數語言程式設計的思維註冊到Handler中。 ```kotlin package com.itdragon.server.message import org.springframework.messaging.Message import org.springframework.messaging.MessageHandler class ITDragonMQTTMessageHandler : MessageHandler { private var handler: ((String) ->
Unit)? = null fun registerHandler(handler: (String) -> Unit) { this.handler = handler } override fun handleMessage(message: Message<*>) { handler?.run { this.invoke(message.payload.toString()) } } } ``` 註冊MessageHandler ```kotlin package com.itdragon.server.message import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import javax.annotation.PostConstruct @Service class ITDragonMessageDispatcher { private val logger = LoggerFactory.getLogger(ITDragonMessageDispatcher::class.java) @Autowired lateinit var itDragonMQTTMessageHandler: ITDragonMQTTMessageHandler @PostConstruct fun init() { itDragonMQTTMessageHandler.registerHandler { itDragonMsgHandler(it) } } fun itDragonMsgHandler(message: String) { logger.info("itdragon mqtt receive message: $message") try { // todo }catch (ex: Exception) { ex.printStackTrace() } } } ``` #### 3.4.1 訊息傳送 注入MQTT的MessageGateway,然後推送訊息。 ```kotlin @Autowired lateinit var mqttGateway: MQTTConfig.MQTTMessageGateway @Scheduled(fixedDelay = 10*1000) fun sendMessage() { mqttGateway.sendToMqtt("Hello ITDragon ${Instant.now()}", "itDragon/tags/cov/set") } ``` ## 4. 開發常見問題 ### 4.1 MQTT每次重連失敗都會增長執行緒數 專案上線一段時間後,客戶的伺服器嚴重卡頓。原因是客戶服務斷網後,MQTT在每次嘗試重連的過程中一直在建立新的執行緒,導致一個Java服務建立了上萬個執行緒。解決方案是更新了org.eclipse.paho.client.mqttv3的版本,也是 "3.1 匯入mqtt庫" 中提到的。後續就沒有出現這個問題了。 ### 4.2 MQTT訊息量大存在訊息丟失的情況 MQTT的訊息量大的情況下,既要保障資料的完整,又要保障效能的穩定。光從MQTT本身上來說,很難做到魚和熊掌不可兼得。[ITDragon龍](https://www.cnblogs.com/itdragon/) 先要理清需求: * 1)資料的完整性,主要用於能耗的統計、報警的分析 * 2)效能的穩定性,伺服器不掛