Android訊息推送MQTT實戰
1 前言
年初做了一款Android TV 應用,用到了MQTT。主要實現的是類似一些景區利用大螢幕實時顯示景點人數,超過人數就不允許進入。即利用閘機裝置監控到進景區的遊客,然後通過MQTT將訊息傳送給大螢幕,最後大螢幕實時顯示景區人數,並響應一個訊息通知閘機裝置已經收到了它發過來的訊息(確保訊息到達)。這篇文章會模擬真實的使用流程進行講解,即閘機發布訊息——伺服器(代理)收到訊息轉發給大螢幕——大螢幕收到訊息後響應回去(釋出訊息)——伺服器收到訊息轉發給閘機裝置。
2 關於MQTT
2.1 簡介
MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸)是IBM開發的一個即時通訊協議。它是一種釋出/訂閱,極其簡單和輕量級的訊息傳遞協議,專為受限裝置和低頻寬,高延遲或不可靠的網路而設計。它的設計思想是輕巧、開放、簡單、規範,易於實現。這些特點使得它對很多場景來說都是很好的選擇,特別是對於受限的環境如機器與機器的通訊(M2M)以及物聯網環境。相對於XMPP,MQTT更加輕量級,並且佔用的寬頻低。
2.2 特點
MQTT協議有以下特點:
- 使用釋出/訂閱訊息模式,提供一對多的訊息釋出,解除應用程式耦合。
- 對負載內容遮蔽的訊息傳輸。
- 使用 TCP/IP 提供網路連線。
- 有三種訊息釋出服務質量:
- qos為0:“至多一次”,訊息釋出完全依賴底層 TCP/IP 網路。會發生訊息丟失或重複。這一級別可用於如下情況,環境感測器資料,丟失一次讀記錄無所謂,因為不久後還會有第二次傳送。
- qos為1:“至少一次”,確保訊息到達,但訊息重複可能會發生。這一級別可用於如下情況,你需要獲得每一條訊息,並且訊息重複傳送對你的使用場景無影響。
- qos為2:“只有一次”,確保訊息到達一次。這一級別可用於如下情況,在計費系統中,訊息重複或丟失會導致不正確的結果。
- 小型傳輸,開銷很小(固定長度的頭部是 2 位元組),協議交換最小化,以降低網路流量。
使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。
2.3 MQTT體系結構
該體系結構圖是結合文章開頭說的例子畫出來的,能很好的描述MQTT在實際運用中的三種身份。即進景區入口配置一臺閘機裝置作為釋出者(Publisher),當閘機裝置監控到有遊客進入的時候會發佈一個帶主題(Topic)的訊息(例如主題為“tourist_enter”)給伺服器(MQTT-Broker),當伺服器接收到釋出過來的訊息後,會進行基於主題的過濾,將訊息轉發給訂閱了該主題的訂閱者。 而景區大螢幕作為訂閱者(Subscriber),訂閱的主題也是“tourist_enter”,這樣就能接收到伺服器轉發過來的訊息,收到訊息後在大螢幕上實時顯示當前景區人數即可。
該結構圖中的閘機裝置和大螢幕都是客戶端,都可以進行釋出和訂閱。例如大螢幕收到訊息後也可以釋出一個訊息通知閘機裝置已經收到了它發過來的訊息。
3 MQTT伺服器搭建
想要使用MQTT,首先需要搭建一個MQTT的伺服器(在公司一般是後臺人員負責搭建)。一般前端人員為了方便測試都會先使用第三方提供的伺服器, ofollow,noindex">官方 推薦了很多種伺服器,我這裡選用的是Apollo(屬於Apache ActiveMQ)。
1. 下載、解壓
點選 下載地址 ,選擇最適合你的作業系統的版本進行下載,我這裡用的是Windows,進行如下選擇:
下載後進行解壓,我這裡解壓到D盤根目錄下(D:\apache-apollo-1.7.1)。
2. 建立伺服器例項
命令列進入解壓檔案的bin目錄下(例如:cd D:\apache-apollo-1.7.1\bin),然後輸入apollo create mybroker(其中mybroker為自定義的伺服器名稱)建立伺服器例項。具體如下圖:
之後會在bin目錄下生成mybroker資料夾,其中mybroker資料夾下的etc\apollo.xml檔案下是配置伺服器資訊的檔案,etc\users.properties檔案包含連線MQTT伺服器時用到的使用者名稱和密碼,注意這裡只能修改密碼(發現很多部落格在沒有驗證的情況下就說使用者名稱和密碼都在這裡修改),如果要修改使用者名稱需要到etc\groups.properties檔案下去修改。etc\groups.properties檔案下的使用者名稱與etc\users.properties檔案下的密碼是一一對應的,如下表示一個組中配置了兩個使用者分別是admin與wildma,然後這兩個使用者名稱對應的密碼分別是password與123456
3. 開啟伺服器
進入mybroker資料夾下的bin目錄下,輸入apollo-broker.cmd run開啟伺服器。看到如下介面表示開啟成功。
4. 驗證是否安裝成功
最後在瀏覽器輸入http://127.0.0.1:61680/,能成功開啟介面就表示安裝成功了。可以用上面配置的兩個使用者名稱進行登入。
4 除錯MQTT的客戶端——mqttfx 的使用
為了方便除錯MQTT,我這裡選用mqttfx作為閘機裝置客戶端。具體使用如下:
-
下載
點選 下載地址 ,選擇最適合你的作業系統的版本進行下載。如下圖:
-
安裝
下載後一路點選下一步即可安裝成功,安裝成功後開啟軟體介面。如下圖:
-
配置
點選上圖中的設定,新增一個新的配置檔案。分別填寫配置檔名稱、伺服器地址(由於伺服器就是本機,所以這裡用本機的IP地址即可,ipconfig/all可獲取IP地址)、埠號(開啟伺服器後會顯示接受連線的地址:Accepting connections at: tcp://0.0.0.0:61613,用這裡的埠號61613即可,見上文中“開啟伺服器”後的圖片)、使用者名稱、密碼,點選OK即可。如下圖:
-
訂閱訊息
選擇剛剛新增的配置檔案“閘機裝置”,點選"Connect"連線伺服器。點選“Subscribe”,設定一個Topic(例如tourist_enter),點選Topic右側的“Subscribe”進行訊息訂閱。如下圖:
-
釋出訊息
點選“Publish”,輸入剛剛訂閱的Topic (tourist_enter),輸入需要釋出的訊息內容(tourist enter),點選Topic右側的“Publish”進行訊息釋出。如下圖:
再返回訂閱介面就能看到剛剛釋出的訊息,如下圖:
5 Android中MQTT的使用
Android中使用MQTT需要使用到Paho Android Service庫,Paho Android Service是一個用Java編寫的MQTT客戶端庫。
GitHub地址: https://github.com/eclipse/paho.mqtt.android
5.1 整合
- 在module的build.gradle檔案中新增依賴
repositories { maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" } } dependencies { compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' }
- 在 AndroidManifest.xml 新增限權
<uses-permission android:name="android.permission.INTERNET" /> <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /> <uses-permission android:name="android.permission.WAKE_LOCK" />
- 在 AndroidManifest.xml 註冊Service (MyMqttService為自己寫的服務,下文會講到)
<service android:name="org.eclipse.paho.android.service.MqttService" /> <!--MqttService--> <service android:name="com.dongyk.service.MyMqttService"/> <!--MyMqttService-->
5.2 具體程式碼
5.2.1 Android中使用MQTT最主要的就是以下幾個方法:
- connect:連線MQTT伺服器,這裡主要講3個引數的方法,如下:
@Override public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException { //... }
引數options:用來攜帶連線伺服器的一系列引數,例如使用者名稱、密碼等。
引數userContext:可選物件,用於向回撥傳遞上下文。一般傳null即可。
引數callback:用來監聽MQTT是否連線成功的回撥
- publish:釋出訊息,這裡使用四個引數的方法,如下:
@Override public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException { //... }
引數topic:釋出訊息的主題
引數payload:訊息的位元組陣列
引數qos:提供訊息的服務質量,可傳0、1或2
引數retained:是否在伺服器保留斷開連線後的最後一條訊息
- subscribe:訂閱訊息,這裡主要講2個引數的方法,如下:
@Override public IMqttToken subscribe(String topic, int qos) throws MqttException, MqttSecurityException { //... }
引數topic:訂閱訊息的主題
引數qos:訂閱訊息的服務質量,可傳0、1或2
5.2.2 MQTT服務——MyMqttService
下面寫一個 Service 來實現MQTT在Android運用中的connect、publish、subscribe
package com.wildma.mqttandroidclient; import android.app.Service; import android.content.Context; import android.content.Intent; import android.net.ConnectivityManager; import android.net.NetworkInfo; import android.os.Build; import android.os.Handler; import android.os.IBinder; import android.support.annotation.Nullable; import android.support.annotation.RequiresApi; import android.util.Log; import android.widget.Toast; import org.eclipse.paho.android.service.MqttAndroidClient; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; /** * Authorwildma * Githubhttps://github.com/wildma * CreateDate2018/11/08 * Desc${MQTT服務} */ public class MyMqttService extends Service { public final String TAG = MyMqttService.class.getSimpleName(); private static MqttAndroidClientmqttAndroidClient; privateMqttConnectOptions mMqttConnectOptions; publicString HOST= "tcp://192.168.0.102:61613";//伺服器地址(協議+地址+埠號) publicString USERNAME= "admin";//使用者名稱 publicString PASSWORD= "password";//密碼 public static String PUBLISH_TOPIC= "tourist_enter";//釋出主題 public static String RESPONSE_TOPIC = "message_arrived";//響應主題 @RequiresApi(api = 26) publicString CLIENTID= Build.VERSION.SDK_INT >= Build.VERSION_CODES.O ? Build.getSerial() : Build.SERIAL;//客戶端ID,一般以客戶端唯一識別符號表示,這裡用裝置序列號表示 @Override public int onStartCommand(Intent intent, int flags, int startId) { init(); return super.onStartCommand(intent, flags, startId); } @Nullable @Override public IBinder onBind(Intent intent) { return null; } /** * 開啟服務 */ public static void startService(Context mContext) { mContext.startService(new Intent(mContext, MyMqttService.class)); } /** * 釋出 (模擬其他客戶端釋出訊息) * * @param message 訊息 */ public static void publish(String message) { String topic = PUBLISH_TOPIC; Integer qos = 2; Boolean retained = false; try { //引數分別為:主題、訊息的位元組陣列、服務質量、是否在伺服器保留斷開連線後的最後一條訊息 mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue()); } catch (MqttException e) { e.printStackTrace(); } } /** * 響應 (收到其他客戶端的訊息後,響應給對方告知訊息已到達或者訊息有問題等) * * @param message 訊息 */ public void response(String message) { String topic = RESPONSE_TOPIC; Integer qos = 2; Boolean retained = false; try { //引數分別為:主題、訊息的位元組陣列、服務質量、是否在伺服器保留斷開連線後的最後一條訊息 mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue()); } catch (MqttException e) { e.printStackTrace(); } } /** * 初始化 */ private void init() { String serverURI = HOST; //伺服器地址(協議+地址+埠號) mqttAndroidClient = new MqttAndroidClient(this, serverURI, CLIENTID); mqttAndroidClient.setCallback(mqttCallback); //設定監聽訂閱訊息的回撥 mMqttConnectOptions = new MqttConnectOptions(); mMqttConnectOptions.setCleanSession(true); //設定是否清除快取 mMqttConnectOptions.setConnectionTimeout(10); //設定超時時間,單位:秒 mMqttConnectOptions.setKeepAliveInterval(20); //設定心跳包傳送間隔,單位:秒 mMqttConnectOptions.setUserName(USERNAME); //設定使用者名稱 mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //設定密碼 // last will message boolean doConnect = true; String message = "{\"terminal_uid\":\"" + CLIENTID + "\"}"; String topic = PUBLISH_TOPIC; Integer qos = 2; Boolean retained = false; if ((!message.equals("")) || (!topic.equals(""))) { // 最後的遺囑 try { mMqttConnectOptions.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue()); } catch (Exception e) { Log.i(TAG, "Exception Occured", e); doConnect = false; iMqttActionListener.onFailure(null, e); } } if (doConnect) { doClientConnection(); } } /** * 連線MQTT伺服器 */ private void doClientConnection() { if (!mqttAndroidClient.isConnected() && isConnectIsNomarl()) { try { mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener); } catch (MqttException e) { e.printStackTrace(); } } } /** * 判斷網路是否連線 */ private boolean isConnectIsNomarl() { ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE); NetworkInfo info = connectivityManager.getActiveNetworkInfo(); if (info != null && info.isAvailable()) { String name = info.getTypeName(); Log.i(TAG, "當前網路名稱:" + name); return true; } else { Log.i(TAG, "沒有可用網路"); /*沒有可用網路的時候,延遲3秒再嘗試重連*/ new Handler().postDelayed(new Runnable() { @Override public void run() { doClientConnection(); } }, 3000); return false; } } //MQTT是否連線成功的監聽 private IMqttActionListener iMqttActionListener = new IMqttActionListener() { @Override public void onSuccess(IMqttToken arg0) { Log.i(TAG, "連線成功 "); try { mqttAndroidClient.subscribe(PUBLISH_TOPIC, 2);//訂閱主題,引數:主題、服務質量 } catch (MqttException e) { e.printStackTrace(); } } @Override public void onFailure(IMqttToken arg0, Throwable arg1) { arg1.printStackTrace(); Log.i(TAG, "連線失敗 "); doClientConnection();//連線失敗,重連(可關閉伺服器進行模擬) } }; //訂閱主題的回撥 private MqttCallback mqttCallback = new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { Log.i(TAG, "收到訊息: " + new String(message.getPayload())); //收到訊息,這裡彈出Toast表示。如果需要更新UI,可以使用廣播或者EventBus進行傳送 Toast.makeText(getApplicationContext(), "messageArrived: " + new String(message.getPayload()), Toast.LENGTH_LONG).show(); //收到其他客戶端的訊息後,響應給對方告知訊息已到達或者訊息有問題等 response("message arrived"); } @Override public void deliveryComplete(IMqttDeliveryToken arg0) { } @Override public void connectionLost(Throwable arg0) { Log.i(TAG, "連線斷開 "); doClientConnection();//連線斷開,重連 } }; @Override public void onDestroy() { try { mqttAndroidClient.disconnect(); //斷開連線 } catch (MqttException e) { e.printStackTrace(); } super.onDestroy(); } }
該 MyMqttService 類的大概邏輯就是開啟服務後,呼叫init()方法初始化各個引數,包括伺服器地址、使用者名稱、密碼等等,然後呼叫doClientConnection()方法連線MQTT伺服器,iMqttActionListener用來監聽MQTT是否連線成功,連線成功則訂閱主題。mqttCallback為訂閱主題的回撥,收到訊息後會執行該回調中的messageArrived()方法,拿到訊息後進行UI更新,並呼叫response()方法響應給對方告知訊息已到達或者訊息有問題等。
5.2.3 開啟服務
在MainActivity中開啟服務,這裡為了方便不做UI更新,所以就一行開啟服務的程式碼,如下:
import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.view.View; public class MainActivity extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); MyMqttService.startService(this); //開啟服務 } }
6 模擬真實場景
還是以文章開頭說的例子來講,現在拿mqttfx客戶端作為閘機裝置,上面的Android程式碼執行後作為大螢幕。
-
將大螢幕與伺服器連線
即將大螢幕APK執行到Android TV上,沒有TV可以用Android手機代替。記得程式碼中的釋出主題設定為“tourist_enter”,響應主題設定為“message_arrived”。
-
將閘機裝置與伺服器連線
選擇閘機裝置——點選連線——釋出主題設定為“tourist_enter”,如下圖:
-
釋出
點選步驟2圖中的Publish按鈕進行釋出
-
大螢幕收到訊息
這時候大螢幕收到伺服器轉發過來的訊息,就會在大螢幕上顯示進場人數,並響應給對方告知訊息已到達。程式碼中為了簡單就彈個Toast表示,具體顯示就不貼圖了。
-
閘機裝置收到訊息
這時候mqttfx切換到Subscribe介面就可以看到大螢幕響應回來的訊息,如下:
如上流程就是大概模擬我在開發中用到的MQTT使用流程,當然我的真實專案並沒有那麼簡單,還包括各種資料和UI互動顯示。希望模擬這種真實的使用流程進行講解能讓各位更好的理解MQTT的使用,有不足的請指出。
專案地址: MqttAndroidClient