MQTT訊息訂閱與解析
阿新 • • 發佈:2019-01-11
MQTT訊息訂閱與解析
-
通過配置檔案建立mqtt客戶端
1.1 xml檔案中新增mqtt客戶端配置項
<!-- mqtt客戶端 --> <bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory"> <property name="userName" value="test" /> <property name="password" value="test" /> </bean> <!-- 訊息介面卡 --> <int-mqtt:message-driven-channel-adapter id="mqtttest" client-id="mqttTest" url="tcp://MQTT伺服器地址:1883" topics="/data/#" qos="2" client-factory="clientFactory" auto-startup="true" send-timeout="12" channel="startCase" /> <int:channel id="startCase" /> <!-- 訊息處理類 --> <int:service-activator id="startCaseService" input-channel="startCase" ref="mqttCaseService" method="messageArrived" /> <bean id="mqttCaseService" class="com.iamapsycho.mqtt.DataHandler" />
1.2 建立訊息解析的類DataHandler
public class DataHandler{ public void messageArrived(Message<?> message) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException { //獲取訊息體訂閱的主題 final String topic = (String) message.getHeaders().get("mqtt_topic"); //獲取訊息體中的訊息 String contentStr = message.getPayload().toString(); //訊息轉json JSONObject jo = JSONObject.parseObject(contentStr); // TODO Auto-generated method stub // 根據具體情況新增解析訊息的方法 }
-
自定義mqtt客戶端並建立連線
2.1 xml檔案中注入bean
<!-- mqttDataHandler mqtt構造類 --> <bean id="mqttDataHandler" class="com.iamapsycho.mqtt.MqttDataHandler"></bean> <!-- 自定義監聽springmvc啟動完成執行內部方法 --> <bean id="mqttBeanListener" class="com.iamapsycho.mqtt.listener.MqttBeanListener"></bean>
2.2 宣告MqttDataHandler類
public class MqttDataHandler implements MqttCallback{ public static Logger logger = Logger.getLogger(MqttDataHandler.class); Service1 service1; Service2 service2; Service3 service3; Service4 service4; MqttBeanListener mqttBeanListener; public MqttDataHandler() {} /** * 建構函式 * @param service1 * @param service2 * @param service3 * @param service4 * @param mqttBeanListener */ public MqttDataHandler(Service1 service1, Service2 service2, Service3 service3, Service4 service4, MqttBeanListener mqttBeanListener) { super(); this.service1= service1; this.service2= service2; this.service3= service3; this.service4= service4; this.mqttBeanListener = mqttBeanListener; } /** * 連線丟失處理方法 */ @Override public void connectionLost(Throwable cause) { // 連線丟失後,一般在這裡面進行重連 logger.info("連線斷開了,準備進行重連"); while (true){ try {//如果沒有發生異常說明連線成功,如果發生異常,則死迴圈 Thread.sleep(6000); logger.info("正在嘗試重連"); mqttBeanListener.connect(); mqttBeanListener.autoSubsrcribe(); break; }catch (Exception e){ continue; } } } /** * 訊息處理方法 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String eqno = topic.split("/")[2]; String contentStr = new String(message.getPayload()); JSONObject jo = JSONObject.parseObject(contentStr); // TODO Auto-generated method stub // 根據具體情況新增解析訊息的方法 } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }
2.3 宣告MqttBeanListener類
/**
*
* @TODO: Spring/SpringMVC在啟動完成後執行方法
*/
public class MqttBeanListener implements ApplicationListener<ContextRefreshedEvent> {
//ContextRefreshedEvent為初始化完畢事件,spring還有很多事件可以利用
@Value("#{prop.mqttServerURI}")
private String mqttServerURI; //在配置檔案中定義,也可以在類中寫死
@Value("#{prop.mqttUsername}")
private String mqttUsername;//在配置檔案中定義,也可以在類中寫死
@Value("#{prop.mqttPassword}")
private String mqttPassword;//在配置檔案中定義,也可以在類中寫死
@Autowired
Service1 service1;
@Autowired
Service2 service2;
@Autowired
Service3 service3;
@Autowired
Service4 service4;
private static MqttClient sampleClient ;
MqttDataHandler mqttDataHandler;
@Override
public void onApplicationEvent(ContextRefreshedEvent ev) {
// 防止重複執行。
if (ev.getApplicationContext().getParent() == null) {
//需要執行的邏輯程式碼,當spring容器初始化完成後就會執行該方法。
this.createMqttClient();
//訂閱預設的主題
this.autoSubsrcribe();
}
}
/**
*
* @TODO: mqtt連結
* @throws MqttException
*/
public void connect() throws MqttException {
MemoryPersistence persistence = new MemoryPersistence();
mqttDataHandler = new MqttDataHandler(service1, service2,
service3, service4, MqttBeanListener.this);
//防止重複建立MQTTClient例項
if (sampleClient==null) {
String clientId = IDUtils.createUUID().toString();
sampleClient = new MqttClient(mqttServerURI, clientId, persistence);
sampleClient.setCallback(mqttDataHandler);
}
MqttConnectOptions options = getOptions();
//判斷攔截狀態,這裡注意一下,如果沒有這個判斷,是非常坑的
if (!sampleClient.isConnected()) {
sampleClient.connect(options);
System.out.println("連線成功");
}else {
//這裡的邏輯是如果連線成功就重新連線
sampleClient.disconnect();
sampleClient.connect(getOptions());
System.out.println("連線成功");
}
}
/**
* @TODO(生成配置物件,使用者名稱,密碼等)
* @return MqttConnectOptions
*/
public MqttConnectOptions getOptions() {
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
//超時設定
connOpts.setConnectionTimeout(10);
return connOpts;
}
/**
* @author iamapsycho
* @TODO 建立Mqtt客戶端
*/
public void createMqttClient() {
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttConnectOptions connOpts = getOptions();
mqttDataHandler = new MqttDataHandler(service1, service2,
service3, service4, MqttBeanListener.this);
String clientId = IDUtils.createUUID().toString();
sampleClient = new MqttClient(mqttServerURI, clientId, persistence);
sampleClient.setCallback(mqttDataHandler);
sampleClient.connect(connOpts);
} catch (MqttException me) {
me.printStackTrace();
}
}
/**
* 訂閱主題
*/
public void subsrcribe(String topic) {
try {
//設定訊息級別
int Qos=0;
sampleClient.subscribe(topic,Qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* @author iamapsycho
* 訂閱 data/upload/equNo,
* 其中equipmentNoList代表資料庫中裝置編號
*/
public void autoSubsrcribe() {
List<String> equNoList = equipmentManageService.getEquipmentNoList();
String uploadTopic = "data/upload/";
for(String equNo: equipmentNoList) {
this.subsrcribe(equNo, uploadTopic);
}
}
}
後期需要優化的地方
在新增新裝置的同時,要對相應的裝置進行訂閱