1. 程式人生 > >MQTT簡單demo(java)

MQTT簡單demo(java)

停用 隊列 設置 msg 的區別 多個 指定 throwable ica

  上次已經簡單的談了一些MQTT協議的一些知識,今天就來就上次的知識具體的Java實現。

  現在就來具體說說實現這一步吧。中間的時間也是有點久。

  MQTT消息的發送和訂閱都是依賴MQTT服務器的,沒有MQTT服務器,你的客戶端是無法訂閱和發送消息的。所以在最開始的時候,可以選擇性的在你的電腦上面安裝一個MQTT服務器。MQTT服務器有很多,大家也可以在網上去找一些安裝教程,這裏因為和我要講內容關系不大,所以不再累述。

  MQTT協議中是沒有發送者和接收者·的概念,所有的連接都是用戶,所以一個MQTT連接既可以發送消息,也可以接收消息。就等於所有的連接都是客戶端。下面我的客戶端代碼也是如此,因為公司這邊接收的信息先是要進行認證,認證成功後再接收有用的信息。這時,客戶端在根據設備的信息來控制網關上面的設備,達到遠程控制設備的目的。因為要使用服務器來轉發消息,所以對於服務器的測試也是比較重要的,但是我使用的是公司的服務器,所以這一塊我的了解比較少。但是我這邊有一些工具,谷歌瀏覽器的插件MQTTLens。可能會幫助你。(需要翻閱墻體)

  MQTT使用的庫也是有很多的,下面的網址也是列舉了MQTT支持的庫,有java的,也有c的。網址如下:https://github.com/mqtt/mqtt.github.io/wiki/libraries。因為最開始我的接觸還是比較淺,使用的是:Fusesource mqtt-client。所以java的demo也是基於這個庫的,但是後來和spring整合的時候發現有一些問題,因為spring支持的只有一個庫,就是Eclipse Paho Java。但是原理都是一樣的,大家可以自己去決定,我的簡單的demo代碼還是基於Fusesource mqtt-client。在下一篇Spring和MQTT整合中使用的是Eclipse Paho Java。

  下面就說一說具體的思路,這邊我的代碼是基於公司的網關需求,所以先說一說公司網關的具體流程。首先,網關會一直發送身份驗證消息,等待客戶端認證,客戶端認證通過後,會發送具體有用的信息。客戶端這時在根據網關信息發送控制命令,到達控制的目的。在這個過程中,客戶端有訂閱和發送,所以一個客戶端就練習了發送消息和訂閱消息。這就是公司的具體操作流程。下面就說一說代碼的流程。

  運行時要使用jar包,也可使用maven,但是使用maven時要註意版本。

  具體的jar包和maven依賴在網址:https://gitee.com/iots/mqtt-client

  依賴為:

<dependency>

  <groupId>org.fusesource.mqtt-client</groupId>

  <artifactId>mqtt-client</artifactId>

  <version>1.12</version>

</dependency>

下面開始編寫demo

首先先要配置MQTT的一些配置,配置比較多,也很繁瑣。

主要是配置主機號和端口號,根據自己的配置編寫代碼,在配置其他的一些細節配置,主要是和連接有關的。

  代碼如下:

  

     // MQTT設置說明
        // 設置主機號
        mqtt.setHost("tcp://10.168.5.208:1883");
        // 用於設置客戶端會話的ID。在setCleanSession(false);被調用時,MQTT服務器利用該ID獲得相應的會話。此ID應少於23個字符,默認根據本機地址、端口和時間自動生成
        mqtt.setClientId("876543210");
        // 若設為false,MQTT服務器將持久化客戶端會話的主體訂閱和ACK位置,默認為true
        mqtt.setCleanSession(false);
        // 定義客戶端傳來消息的最大時間間隔秒數,服務器可以據此判斷與客戶端的連接是否已經斷開,從而避免TCP/IP超時的長時間等待
        mqtt.setKeepAlive((short) 60);
        // 服務器認證用戶名
        mqtt.setUserName("admin");
        // 服務器認證密碼
        mqtt.setPassword("admin");
        // 設置“遺囑”消息的話題,若客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的“遺囑”消息
        mqtt.setWillTopic("willTopic");
        // 設置“遺囑”消息的內容,默認是長度為零的消息
        mqtt.setWillMessage("willMessage");
        // 設置“遺囑”消息的QoS,默認為QoS.ATMOSTONCE
        mqtt.setWillQos(QoS.AT_LEAST_ONCE);
        // 若想要在發布“遺囑”消息時擁有retain選項,則為true
        mqtt.setWillRetain(true);
        // 設置版本
        mqtt.setVersion("3.1.1");
        // 失敗重連接設置說明
        // 客戶端首次連接到服務器時,連接的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1
        mqtt.setConnectAttemptsMax(10L);
        // 客戶端已經連接到服務器,但因某種原因連接斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1
        mqtt.setReconnectAttemptsMax(3L);
        // 首次重連接間隔毫秒數,默認為10ms
        mqtt.setReconnectDelay(10L);
        // 重連接間隔毫秒數,默認為30000ms
        mqtt.setReconnectDelayMax(30000L);
        // 設置重連接指數回歸。設置為1則停用指數回歸,默認為2
        mqtt.setReconnectBackOffMultiplier(2);

        // Socket設置說明
        // 設置socket接收緩沖區大小,默認為65536(64k)
        mqtt.setReceiveBufferSize(65536);
        // 設置socket發送緩沖區大小,默認為65536(64k)
        mqtt.setSendBufferSize(65536);
        // 設置發送數據包頭的流量類型或服務類型字段,默認為8,意為吞吐量最大化傳輸
        mqtt.setTrafficClass(8);

        // 帶寬限制設置說明
        // 設置連接的最大接收速率,單位為bytes/s。默認為0,即無限制
        mqtt.setMaxReadRate(0);
        // 設置連接的最大發送速率,單位為bytes/s。默認為0,即無限制
        mqtt.setMaxWriteRate(0);

        // 選擇消息分發隊列
        // 若沒有調用方法setDispatchQueue,客戶端將為連接新建一個隊列。如果想實現多個連接使用公用的隊列,顯式地指定隊列是一個非常方便的實現方法
        mqtt.setDispatchQueue(Dispatch.createQueue("foo"));  

  上面都是一些配置的問題,具體情況自己決定配置。具體的配置也可以參考下面的網址,這個網址也有詳細的描述:https://gitee.com/iots/mqtt-client。

  下面開始講講連接和訂閱和發送主題

  fusesource提供三種mqtt client api,分別為阻塞API,基於Futur的API和回調API。

  其中阻塞API是在MQTT.connectBlocking方法建立連接和提供阻斷API的連接。

  基於Futur的API則是:在MQTT.connectFuture方法建立連接,為您提供了一個與結合Futur的連接。所有操作的連接是無阻塞的,並且經由返回的結果。

  回調API是最復雜的也是性能最好的,另外兩種均是對回調API的封裝。

  因為回調API有些復雜,現在只是介紹回調API的封裝。就是前兩個,前兩個的區別是第一個為阻塞的,第二個不是阻塞。下面開始代碼演示。

  第一個阻塞API。代碼如下:

     // 使用future連接
        FutureConnection connection = mqtt.futureConnection();
        Future<Void> f1 = connection.connect();
        f1.await();
        // 訂閱消息
        Future<byte[]> f2 = connection.subscribe(new Topic[] { new Topic("datasources/1/1", QoS.AT_LEAST_ONCE) });
        //
        byte[] qoses = f2.await();

        // 發送身份驗證消息.
        // Future<Void> f3 = connection.publish("foo", "Hello".getBytes(),
        // QoS.AT_LEAST_ONCE, false);
        // 接收訂閱消息..
        Future<Message> receive = connection.receive();
        // 打印消息.
        Message message = receive.await();
        System.out.println(String.valueOf(message.getPayloadBuffer()));
        // 回應
        message.ack();
        //
        Future<Void> f4 = connection.disconnect();
        f4.await();

  第三個是最難的,我這邊的代碼也是有點亂,直接上代碼吧。

     // 監聽
        connection.listener(new Listener() {
            @Override
            public void onPublish(UTF8Buffer topicmsg, Buffer msg, Runnable ack) {
                // utf-8 is used for dealing with the garbled
                String topic = topicmsg.utf8().toString();
                String payload = msg.utf8().toString();
                System.out.println(topic + "  " + payload);
                String Amsg = AuthenticationSendDemo.Authentication(topic, payload);
                if (topic.equals("datasources/req")) {
                    // 重起一個阻塞線程
                    connection.getDispatchQueue().execute(new Runnable() {
                        public void run() {
                            connection.publish("datasources/17/01/req_ack", Amsg.getBytes(), QoS.AT_LEAST_ONCE, false,
                                    new Callback<Void>() {
                                        @Override
                                        public void onSuccess(Void args) {
                                            // 表示發布主題成功
                                            System.out.println("發布成功!");
                                            System.out.println("發布的消息" + Amsg);

                                        }

                                        @Override
                                        public void onFailure(Throwable throwable) {
                                            // 表示發布主題失敗
                                            System.out.println("發布失敗!");
                                        }
                                    });
                        }
                    });
                }
                // 表示監聽成功
                ack.run();
            }

            @Override
            public void onFailure(Throwable value) {
                // 表示監聽失敗
            }

            // execute only once when connection is ended
            @Override
            public void onDisconnected() {
                // 表示監聽到斷開連接
                System.out.println("斷開連接!!");
            }

            // execute only once when connecting started
            @Override
            public void onConnected() {
                // 表示監聽到連接成功
                System.out.println("haha");
                System.out.println();
            }
        });

  因為代碼中使用到了線程和回調,我對於這兩個掌握的也不是很好,也不再這裏亂扯,有大佬知道比較好的寫法最好指點一下。在這裏感謝。

  三種寫法都寫完了,下面談一談感想和中間遇到的問題。

  以為看具體的文檔實在太多了,現在公司還在忙著趕項目,我這邊時間也不是很多,代碼的整理以後有時間在說。我感覺最重要的還是對於協議的一些掌握和體會,這些要比上面的代碼重要的多,因為你最終的代碼還是要和項目整合的,和Spring整合的時候你會發現這些都是框架提供好了,你需要做的就是填參數,但是整合中遇到的問題的解決辦法都是你從寫上面的代碼中得到的。

因為剛開始寫代碼,所以代碼中的註釋也是非常多的,這裏也不再累述。寫上面的代碼的時候遇到了很多的問題,解決的網站都在我第一篇MQTT博客中,比如MQTT的官網,網上的文章都是抄的,要不就是一知半解(我也是)。最終還是看自己的深入體會。

  就這樣吧,結束。

MQTT簡單demo(java)