1. 程式人生 > >MQTT+ActiveMQ實現訊息推送(移動端)

MQTT+ActiveMQ實現訊息推送(移動端)

這個小程式是我導師給我佈置的一個任務,網上教程不是很多,遇到的一些困難都是自己解決的,所以寫出來分享一下,有什麼問題大家可以留言,盡力幫大家解決。

來到解壓目錄下,進入bin目錄下的win64資料夾(如何是32位機器則進入win32),執行activemq.bat。如果出現拒絕訪問則右鍵以管理員身份執行,出現下圖時說明activemq啟動成功。
這裡寫圖片描述

伺服器啟動成功後,即可開啟移動端點選對應按鈕進行連線。

如何出現MqttException的異常,就是連線伺服器失敗, 請檢查伺服器是否開啟以及URL和埠是否正確

連線成功後,移動端就可以接收activeMQ訊息伺服器上傳過來的訊息,效果圖如下
這裡寫圖片描述

我用的是org.eclipse.paho.client.mqttv3,裡面封裝了MqttClient、MqttConnection、MqttTopic、MqttMessage等類,附帶下service的程式碼,帶有比較詳細的註解,可以看下各種類的作用,有需要的可以去查下原始碼

public class MQTTService extends Service {
    //訊息伺服器的URL
    public static final String BROKER_URL = "tcp://192.168.191.6:1883";
    //客戶端ID,用來標識一個客戶,可以根據不同的策略來生成
public static final String clientId = "android-client"; //訂閱的主題名 public static final String TOPIC = "test"; //mqtt客戶端類 private MqttClient mqttClient; //mqtt連線配置類 private MqttConnectOptions options; private String userName = "admin"; private String passWord = "password"
; public IBinder onBind(Intent intent) { return null; } @Override public void onStart(Intent intent, int startId) { try { //在服務開始時new一個mqttClient例項,客戶端ID為clientId,第三個引數說明是持久化客戶端,如果是null則是非持久化 mqttClient = new MqttClient(BROKER_URL, clientId, new MemoryPersistence()); // MQTT的連線設定 options = new MqttConnectOptions(); // 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線 //換而言之,設定為false時可以客戶端可以接受離線訊息 options.setCleanSession(false); // 設定連線的使用者名稱 options.setUserName(userName); // 設定連線的密碼 options.setPassword(passWord.toCharArray()); // 設定超時時間 單位為秒 options.setConnectionTimeout(10); // 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制 options.setKeepAliveInterval(20); // 設定回撥 回撥類的說明看後面 mqttClient.setCallback(new PushCallback(this)); MqttTopic topic = mqttClient.getTopic(TOPIC); //setWill方法,如果專案中需要知道客戶端是否掉線可以呼叫該方法。設定最終埠的通知訊息 options.setWill(topic, "close".getBytes(), 2, true); //mqtt客戶端連線伺服器 mqttClient.connect(options); //mqtt客戶端訂閱主題 //在mqtt中用QoS來標識服務質量 //QoS=0時,報文最多傳送一次,有可能丟失 //QoS=1時,報文至少傳送一次,有可能重複 //QoS=2時,報文只發送一次,並且確保訊息只到達一次。 int[] Qos = {1}; String[] topic1 = {TOPIC}; mqttClient.subscribe(topic1, Qos); } catch (MqttException e) { Toast.makeText(getApplicationContext(), "Something went wrong!" + e.getMessage(), Toast.LENGTH_LONG).show(); e.printStackTrace(); } super.onStart(intent, startId); } @Override public void onDestroy() { try { mqttClient.disconnect(0); } catch (MqttException e) { Toast.makeText(getApplicationContext(), "Something went wrong!" + e.getMessage(), Toast.LENGTH_LONG).show(); e.printStackTrace(); } } } public class PushCallback implements MqttCallback { //回撥方法類需要實現MqttPushBack介面,用於訊息推送過程中某一事件觸發後進行相應的處理 private ContextWrapper context; public PushCallback(ContextWrapper context) { this.context = context; } @Override public void connectionLost(Throwable cause) { //連線斷開時的回撥方法,可以在這裡重新連線 } @SuppressLint("NewApi") @Override public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception { //有新訊息到達時的回撥方法 final NotificationManager notificationManager = (NotificationManager) context.getSystemService(Context.NOTIFICATION_SERVICE); final Notification notification; final Intent intent = new Intent(context, BlackIceActivity.class); final PendingIntent activity = PendingIntent.getActivity(context, 0, intent, 0); Notification.Builder builder = new Notification.Builder(context) .setAutoCancel(true) .setContentTitle("Message") .setContentText(new String(message.getPayload()) + " ") .setContentIntent(activity) .setSmallIcon(R.drawable.snow) .setWhen(System.currentTimeMillis()) .setOngoing(true); notification=builder.getNotification(); notification.flags |= Notification.FLAG_AUTO_CANCEL; notification.number += 1; notificationManager.notify(0, notification); } @Override public void deliveryComplete(MqttDeliveryToken token) { //成功釋出某一訊息後的回撥方法 } }

這裡順帶介紹下activeMQ的web console,通過它我們可以很快捷輕鬆地來發布推送的內容。

用瀏覽器開啟127.0.0.1:8161/admin(具體URL取決於部署IP),登陸的使用者名稱密碼預設均為admin
這裡寫圖片描述

我們可以看到當前伺服器中存在的topic,在subscribers頁面下我們還可以看到訂閱者的列表

那麼如何釋出訊息呢?在topic頁面下點選某一topic
這裡寫圖片描述

這裡有很多屬性可以根據自己的需要進行設定,點選send就可以對該topic的訂閱使用者進行推送了