1. 程式人生 > >基於MQTT協議的 org.eclipse.paho.client.mqttv3 原始碼學習(二)

基於MQTT協議的 org.eclipse.paho.client.mqttv3 原始碼學習(二)

一、主要類介紹


二、重點類程式碼分析

對於長連線,一般是直接從訊息的接收和傳送類開始讀,上面知道paho中訊息傳送和接收是在CommsSender和CommsReceiver實現的,

所以直接差看CommsSender程式碼。

[java]  view plain  copy
  1. public void run() {  
  2.         final
     String methodName = "run";  
  3.         MqttWireMessage message = null;  
  4.         while (running && (out != null)) {  
  5.             try {  
  6.                 message = clientState.get();  
  7.   
  8.                 log("sender 802:begin->"
     + message.toString());  
  9.                 if (message != null) {  
  10.                     // @TRACE 802=network send key={0} msg={1}  
  11.                     log.fine(className, methodName, "802"new Object[] { message.getKey(), message });  
  12.   
  13.                     if (message instanceof MqttAck) {  
  14.                         out.write(message);  
  15.                         out.flush();  
  16.                     } else {  
  17.                         MqttToken token = tokenStore.getToken(message);  
  18.                         // While quiescing the tokenstore can be cleared so need  
  19.                         // to check for null for the case where clear occurs  
  20.                         // while trying to send a message.  
  21.                         if (token != null) {  
  22.                             synchronized (token) {  
  23.                                 out.write(message);  
  24.                                 try {  
  25.                                     out.flush();  
  26.                                 } catch (IOException ex) {  
  27.                                     // The flush has been seen to fail on  
  28.                                     // disconnect of a SSL socket  
  29.                                     // as disconnect is in progress this should  
  30.                                     // not be treated as an error  
  31.                                     if (!(message instanceof MqttDisconnect))  
  32.                                         throw ex;  
  33.                                 }  
  34.                                 clientState.notifySent(message);  
  35.                             }  
  36.                         }  
  37.                     }  
  38.                     log("sender 805:send success.");  
  39.                 } else { // null message  
  40.                     // @TRACE 803=get message returned null, stopping}  
  41.                     log.fine(className, methodName, "803");  
  42.   
  43.                     running = false;  
  44.                     log("sender 805:send empty.");  
  45.                 }  
  46.             } catch (MqttException me) {  
  47.                 log("sender 804:MqttException-> " + me.getLocalizedMessage());  
  48.                 handleRunException(message, me);  
  49.             } catch (Exception ex) {  
  50.                 log("sender 804:exception-> " + ex.getLocalizedMessage());  
  51.                 handleRunException(message, ex);  
  52.             }  
  53.         } // end while  
  54.   
  55.         // @TRACE 805=<  
  56.         log.fine(className, methodName, "805");  
  57.   
  58.     }  

程式碼可以看到,是直接一個執行緒無效迴圈獲取訊息然後傳送,
message = clientState.get();進入檢視訊息獲取程式碼

[java]  view plain  copy
  1. protected MqttWireMessage get() throws MqttException {  
  2.         final String methodName = "get";  
  3.         MqttWireMessage result = null;  
  4.   
  5.         synchronized (queueLock) {  
  6.             while (result == null) {  
  7.                 // If there is no work wait until there is work.  
  8.                 // If the inflight window is full and no flows are pending wait until space is freed.  
  9.                 // In both cases queueLock will be notified.  
  10.                 if ((pendingMessages.isEmpty() && pendingFlows.isEmpty()) ||   
  11.                     (pendingFlows.isEmpty() && actualInFlight >= this.maxInflight)) {  
  12.                     try {  
  13.                         long ttw = getTimeUntilPing();  
  14.                         //@TRACE 644=wait for {0} ms for new work or for space in the inflight window   
  15.                         log.fine(className,methodName, "644"new Object[] {new Long(ttw)});                          
  16.    
  17.                         queueLock.wait(getTimeUntilPing());  
  18.                     } catch (InterruptedException e) {  
  19.                     }  
  20.                 }  
  21.                   
  22.                 // Handle the case where not connected. This should only be the case if:   
  23.                 // - in the process of disconnecting / shutting down  
  24.                 // - in the process of connecting  
  25.                 if (!connected &&   
  26.                     (pendingFlows.isEmpty() || !((MqttWireMessage)pendingFlows.elementAt(0instanceof MqttConnect))) {  
  27.                     //@TRACE 621=no outstanding flows and not connected  
  28.                     log.fine(className,methodName,"621");  
  29.   
  30.                     return null;  
  31.                 }  
  32.   
  33.                 // Check if there is a need to send a ping to keep the session alive.   
  34.                 // Note this check is done before processing messages. If not done first  
  35.                 // an app that only publishes QoS 0 messages will prevent keepalive processing  
  36.                 // from functioning.  
  37.                 checkForActivity();  
  38.                   
  39.                 // Now process any queued flows or messages  
  40.                 if (!pendingFlows.isEmpty()) {  
  41.                     // Process the first "flow" in the queue  
  42.                     result = (MqttWireMessage)pendingFlows.elementAt(0);  
  43.                     pendingFlows.removeElementAt(0);  
  44. 相關推薦

    基於MQTT協議org.eclipse.paho.client.mqttv3 原始碼學習()

    一、主要類介紹 二、重點類程式碼分析 對於長連線,一般是直接從訊息的接收和傳送類開始讀,上面知道paho中訊息傳送和接收是在CommsSender和CommsReceiver實現的, 所以直接差看CommsSender程式碼。 [ja

    基於MQTT協議org.eclipse.paho.client.mqttv3 原始碼學習(一)

    一、MQTT協議簡敘 MQTT 有以下特點: 使用釋出/訂閱訊息模式,提供一對多的訊息釋出,解除應用程式耦合 使用TCP/IP提供網路連線 有三種釋出訊息服務質量 “至多一次 Qos level=0”,訊息釋出完全依賴底層 TCP/IP 網路。會發生訊息丟失或重複。這

    MQTT協議實現Eclipse Paho學習總結一

    一、概述 遙測傳輸 (MQTT) 是輕量級基於代理的釋出/訂閱的訊息傳輸協議,設計思想是開放、簡單、輕量、易於實現。這些特點使它適用於受限環境。例如,但不僅限於此: 網路代價昂貴,頻寬低、不可靠。在嵌入裝置中執行,處理器和記憶體資源有限。 該協議的特點有: 使用

    MQTT協議實現Eclipse Paho學習總結

    來源 list 設計 cte soc flush play req log MQTT協議實現Eclipse Paho學習總結摘自:https://www.cnblogs.com/yfliufei/p/4383852.html 2015-04-01 14:57 by 辣椒醬,

    通過集群的方式解決基於MQTT協議的RabbitMQ消息收發

    其中 enc msg received 127.0.0.1 結束 技術 tms gpu 在完成了基於AMQP協議的RabbitMQ消息收發後,我們要繼續實現基於MQTT協議的RabbitMQ消息收發。 由於C#的RabbitMQ.Client包中只實現了基於AMQP協議的消

    Ubuntu環境下基於MQTT協議的mosquitto以及叢集的安裝和簡單使用

    本文就眾多MQTT-Server中的mosquitto的安裝進行講解。 一、下載。 對於Ubuntu系統,可以使用sudo apt-get 來安裝mosquitto,但是這種方法雖然簡單,但是對於配置檔案的修改和管理比較麻煩,配置檔案需要自己寫好然後啟動時載入,因此不太推

    基於MQTT協議進行應用開發

    來自:http://www.cnblogs.com/secondtononewe/p/6073089.html 官方協議有句如下的話來形容MQTT的設計思想: “It is designed for connections with remote locations

    Android訊息推送()--基於MQTT協議實現的推送功能

    前段時間公司需要android端的手機群推功能,我們就通過MQTT來實現了該功能。 MQTT的官網如下 http://mqtt.org/ 關於系統的主要架構就不詳述了。這關係的到職業道德問題,在這裡

    基於MQTT協議的Mosquitto的使用及libmosquitto客戶端程式設計

    概述:工作之餘學習mqtt,使用開源的Mosquitto實現mqtt的使用。如:broker(伺服器/代理),訊息訂閱及釋出。 系統環境:ubuntu14.04 1,MQTT簡介 MQTT 是一個輕量級協議,使用基於 TCP/IP 協議的釋出/訂閱

    MQTT協議之訂閱及釋出(使用paho-mqtt-clientmqttv3實現)

    另外一個MQTT釋出訂閱客戶端paho-mqtt-client或mqttv3採用回撥的方式實現訊息的接收,下面看一下實現: 1.訊息接收回調類 [java] view plain copy  print? package cn.smartslim.mqt

    eclipse 程序包org.apache.axis.client不存在

    clip 不存在 原因 org ips axis bubuko lips 分享   在普通的 java 項目中引入jar後運行報錯:程序包org.apache.axis.client不存在      原因是多引入了一個非jar文件log4j.properties,如下:

    MQTT Java客戶端Eclipse paho實現資料的傳送和接收

    MQTT(MQ Telemetry Transport)是IBM開發的一種網路應用層的協議 使用場景: 1、不可靠、網路頻寬小的網路 2、執行的裝置CPU、記憶體非常有限 特點: 1、基於釋出/訂閱模型的協議 2、他是二進位制協議,二進位制的特點就是緊湊、佔用

    mqtt協議 springboot2.0.4 mqttv3 釋出訂閱程式碼呼叫,mqtt斷線重連

    mqttv3 釋出訂閱程式碼呼叫 我用的是springboot2.0.4 直接上程式碼: pom.xml <dependency> <groupId>org.eclipse.paho</groupId>

    使用eclipse paho在java端實現MQTT訊息的收發(客戶端與服務端例項)

    一、MQTT(訊息佇列)簡介 MQTT(MQ Telemetry Transport)是IBM開發的一種網路應用層的協議,提供輕量級的,支援可釋出/可訂閱的的訊息推送模式,使裝置對裝置之間的短訊息通訊變得簡單,比如現在應用廣泛的低功耗感測器,手機、嵌入式計算機

    mqtt協議 springboot2.0.4 mqttv3 釋出訂閱程式碼呼叫,mqtt斷線重連

    上篇博文講了安裝和配置:https://blog.csdn.net/jianeng_Love_IT/article/details/83061717 mqttv3 釋出訂閱程式碼呼叫 我用的是springboot2.0.4 直接上程式碼: pom.xml <depend

    8226基於nodemcu通過mqtt協議傳送接收訊息

    一過完年即將陷入一場惡戰之中,可能無暇在玩這麼多東西了,趁著過年把之前一直想搞的mqtt協議給玩一玩 。簡單說一些mqtt協議:MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸)是IBM開發的一個即時通訊協議,有可能成為物聯網

    [7] MQTT,mosquitto,Eclipse Paho---MQTT訊息格式之CONNACK訊息分析

    在上節中( [6] MQTT,mosquitto,Eclipse Paho---MQTT訊息格式之CONNECT訊息格式分析)我們分析了CONNECT訊息格式,我們知道CONNECT訊息是客戶端傳送出去的,作為對客戶端的連線請求,伺服器端同樣會有一個訊息的返回,這個訊息就是

    Mqtt協議IOS端移植2

    and cat otto client end abi 本地 top 解析 MqttFramework.h #import <Foundation/Foundation.h> #import "MQTTClient.h" #import "Busines

    Android studio沒有org.apache.http.client.HttpClient;等包問題 解決方案

    target ace mon studio clas 技術 upload rep img 以前用Eclipse做Android開發工具一直使用apache的http做網絡請求,最近換用了Android studio發現沒有辦法引用apache的包,下面是我引用的步驟

    Java中基於HTTP協議網絡編程

    copy 統一 throws 網絡編程 設置 查詢 trac enc pac java中為我們的網絡支持提供了java.net包,能夠使我們以編程的方式來訪問Web服務功能,這篇博客,就跟大家分享一下。Java中的網絡編程的知識。主要是學習下該java.net包下的