1. 程式人生 > >android 實現mqtt訊息推送,以及不停斷線重連的問題解決

android 實現mqtt訊息推送,以及不停斷線重連的問題解決

前段時間專案用到mqtt的訊息推送,整理一下程式碼,程式碼的原型是網上找的,具體哪個地址已經忘記了。

程式碼的實現是新建了一個MyMqttService,全部功能都在裡面實現,包括連伺服器,斷線重連,訂閱訊息,處理訊息,釋出訊息等基本操作。

首先新增依賴:

dependencies {
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}

然後編輯AndroidManifest.xml,先新增許可權:

    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
    <uses-permission android:name="android.permission.WAKE_LOCK" />

再註冊service:

<service android:name="org.eclipse.paho.android.service.MqttService" />
<service
    android:name=".service.MyMqttService"
    android:enabled="true"
    android:exported="true"/>

接著進入正文MyMqttService.java,功能見註釋吧:

package com.example.nan.mqtt.service;

import android.app.Notification;
import android.app.NotificationManager;
import android.app.PendingIntent;
import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.graphics.BitmapFactory;
import android.os.Bundle;
import android.os.IBinder;
import android.support.v4.app.NotificationCompat;
import android.util.Log;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.JSONObject;

/**
 * @author nan
 */
public class MyMqttService extends Service {

    private static final String TAG = "nlgMqttService";
    private static final String TOPIC_TO_QA = "/s2c/task_quality/";

    private static final String publishTopic = "exampleAndroidPublishTopic";

    private MqttAndroidClient mqttAndroidClient;
    private NotificationManager mNotificationManager;


    public MyMqttService() {
    }

    @Override
    public IBinder onBind(Intent intent) {
        // TODO: Return the communication channel to the service.
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override
    public void onCreate() {
        super.onCreate();
        Log.d(TAG, "MqttService onCreate executed");
        //mqtt伺服器的地址
        final String serverUri = "tcp://192.168.10.10:1883";
        //新建Client,以裝置ID作為client ID
        mqttAndroidClient = new MqttAndroidClient(MyMqttService.this, serverUri, getIMEI());
        mqttAndroidClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
		//連線成功
                if (reconnect) {
                    Log.d(TAG, "connectComplete: " + serverURI);
                    // Because Clean Session is true, we need to re-subscribe
                    subscribeAllTopics();
                } else {
                    Log.d(TAG, "connectComplete: " + serverURI);
                }
            }

            @Override
            public void connectionLost(Throwable cause) {
		//連線斷開
                Log.d(TAG, "connectionLost: connection was lost");
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) {
		//訂閱的訊息送達,推送notify
                String payload = new String(message.getPayload());
                Log.d(TAG, "Topic: " + topic + " ==> Payload: " + payload);
                if(mNotificationManager == null) {
                    mNotificationManager = (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
                }
                int roleId = SinSimApp.getApp().getRole();
                Gson gson = new Gson();
                ServerToClientMsg msg = gson.fromJson(payload, new TypeToken<ServerToClientMsg>(){}.getType());
                if(msg != null) {
		    //接受訊息
		    if(topic != null) {
			if(topic.equals(TOPIC_TO_QA)) {
				Intent intent = new Intent(MyMqttService.this, ProcessToCheckoutActivity.class);
				PendingIntent pi = PendingIntent.getActivity(MyMqttService.this, 0, intent, 0);
				NotificationCompat.Builder builder = new NotificationCompat.Builder(MyMqttService.this, TOPIC_TO_QA);
				Notification notify = builder.setSmallIcon(R.mipmap.to_quality)
						.setLargeIcon(BitmapFactory.decodeResource(getResources(), R.mipmap.to_quality))
						.setDefaults(Notification.DEFAULT_SOUND|Notification.DEFAULT_VIBRATE)//響鈴震動
						.setContentTitle("快遞來了")
						.setAutoCancel(true)
						.setContentIntent(pi)
						.setVisibility(Notification.VISIBILITY_PUBLIC)
						.setContentText("你的快遞單號:" + msg.getOrderNum())
						//不設定此項不會懸掛,false 不會出現懸掛
						.build();
				mNotificationManager.notify(2,notify);
			} 
                    }
                }
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                //即伺服器成功delivery訊息
            }
        });
	//新建連線設定
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        //斷開後,是否自動連線
        mqttConnectOptions.setAutomaticReconnect(true);
        //是否清空客戶端的連線記錄。若為true,則斷開後,broker將自動清除該客戶端連線資訊
        mqttConnectOptions.setCleanSession(false);
        //設定超時時間,單位為秒
        //mqttConnectOptions.setConnectionTimeout(2);
        //心跳時間,單位為秒。即多長時間確認一次Client端是否線上
        //mqttConnectOptions.setKeepAliveInterval(2);
        //允許同時傳送幾條訊息(未收到broker確認資訊)
        //mqttConnectOptions.setMaxInflight(10);
        //選擇MQTT版本
        mqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
        try {
            Log.d(TAG, "onCreate: Connecting to " + serverUri);
	    //開始連線
            mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.d(TAG, "onSuccess: Success to connect to " + serverUri);
                    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
                    disconnectedBufferOptions.setBufferEnabled(true);
                    disconnectedBufferOptions.setBufferSize(100);
                    disconnectedBufferOptions.setPersistBuffer(false);
                    disconnectedBufferOptions.setDeleteOldestMessages(false);
                    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
		    //成功連線以後開始訂閱
                    subscribeAllTopics();
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    //連線失敗
		    Log.d(TAG, "onFailure: Failed to connect to " + serverUri);
                    exception.printStackTrace();
                }
            });
        } catch (MqttException ex) {
            ex.printStackTrace();
        }

	//service繫結notification
        Intent intent = new Intent(this, SplashActivity.class);
        intent.putExtra(SinSimApp.FROM_NOTIFICATION, true);
        //這邊設定“FLAG_UPDATE_CURRENT”是為了讓後面的Activity接收pendingIntent中Extra的資料
        PendingIntent pi = PendingIntent.getActivity(this, 0, intent, PendingIntent.FLAG_UPDATE_CURRENT);
        Notification notification = new NotificationCompat.Builder(this)
                .setContentTitle("mqtt快遞")
                .setContentText("mqtt快遞管理系統")
                .setWhen(System.currentTimeMillis())
                .setSmallIcon(R.mipmap.ic_launcher)
                .setLargeIcon(BitmapFactory.decodeResource(getResources(), R.mipmap.ic_launcher))
                .setContentIntent(pi)
                .build();
        startForeground(1, notification);
    }

    //訂閱所有訊息
    private void subscribeAllTopics() {
	subscribeToTopic(TOPIC_TO_QA);
    }

    /**
     * 訂閱訊息
     */
    public void subscribeToTopic(String subscriptionTopic) {
        try {
            mqttAndroidClient.subscribe(subscriptionTopic, 2, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.d(TAG, "onSuccess: Success to Subscribed!");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.d(TAG, "onFailure: Failed to subscribe");
                }
            });
        } catch (MqttException ex) {
            Log.d(TAG, "subscribeToTopic: Exception whilst subscribing");
            ex.printStackTrace();
        }
    }

    /**
     * 釋出訊息
     */
    public void publishMessage(String msg) {
        try {
            MqttMessage message = new MqttMessage();
            message.setPayload(msg.getBytes());
            mqttAndroidClient.publish(publishTopic, message);
            Log.d(TAG, "publishMessage: Message Published: " + msg);
        } catch (MqttException e) {
            Log.d(TAG, "publishMessage: Error Publishing: " + e.getMessage());
            e.printStackTrace();
        }
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        Log.d(TAG, "MqttService onStartCommand executed");
        return super.onStartCommand(intent, flags, startId);
    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        try {
            if(mqttAndroidClient!=null){
		//服務退出時client斷開連線
                mqttAndroidClient.disconnect();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
        Log.d(TAG, "MqttService onDestroy executed");
    }
}
除錯過程中出現過一個小插曲:服務在有些時候會不停的斷線重連。斷線重連的設定是開了的:
mqttConnectOptions.setAutomaticReconnect(true);
但是斷開的原因找不到,當時還沒有重寫onDestory方法,就算退出應用也還在重連,一度懷疑service的開啟與關閉的問題,還系統的重新學習了一下service的使用,學完以後也沒有啥進展,然後重學mqtt的呼叫流程發揮了效果,在onDestory裡面呼叫了disconnect()方法,完了以後在退出應用以後就不會重連了,但是重新開還是繼續不停重連。到了晚上,奇怪的事情發生了,當夜深人靜,獨自加班的時候,居然再也復現不了了。為什麼呢,心想可能平時給八阿哥上的香起了效果,那就開心的回家吧。下班的路上雖然開心的吃了塊雞排,但心裡的結還是沒有開啟,為什麼呢,是道德的淪喪還是人性的扭曲,讓我獨自加班還不餓給我復現問題。突然靈光一現,想到今天特麼加班就我一個人,也就是一個人玩就是好的,玩的人多就會有問題,那麼答案就來了,跟唯一性有關的只有clientID了,特麼老子把clientID設定成使用者id了,測試用的使用者id就註冊了3個,好幾個人來回切著用,不出問題才怪。於是我默默的把clientID改成了裝置id,困擾2天的問題就這麼解決了。