1. 程式人生 > >阿里雲訊息佇列MQTT踩坑之路(阿里雲MQTT Android客戶端)

阿里雲訊息佇列MQTT踩坑之路(阿里雲MQTT Android客戶端)

最近需求需要實現收款語音的播報,如果不考慮費用問題,使用推送與百度AL開發平臺的語音合成功能就可也實現收款語音的播報功能,但是考慮到費用問題,同時還考慮到ios與Android可以同步使用的問題(ps:ios如果使用推送需要把應用上架到蘋果商店,需要有證書,而且還需要通過稽核才能進行推送。但是目前ios的簽名一般都不上架,而是進行企業籤,使用第三方的分發進行軟體的更新,所以一般都不考慮上架)。由此,第二種方案就出來了,使用訊息佇列MQTT進行收款金額的推送,在移動端進行收款語音的播報,下面總結一下我在對接阿里雲MQTT的踩坑的總結(因為阿里雲給出的Android開發文件太少了,只能一步步摸索)。

首先,需要對接阿里雲的MQTT功能,就需要有一個阿里雲的訊息佇列的賬號,這些就略過了,直接進行到主題。

一、使用MQTT使用到的幾點

1、topic管理(也就是訊息主題)

2、生產者管理(訊息的傳送者,也就是服務端傳送的資料)

3、消費者管理(訊息的接收者,也就是移動端接收的資料)

4、例項管理

5、Group Id管理

附上圖片:

以上這些一般都是後臺人員進行建立,但是在測試階段,移動端開發人員也會自己去建立一個測試的例項進行測試使用,所以此部分也是需要了解的,具體建立參考官方文件即可。

二、Android接入示例

建立好以上需要的資料之後,就可以進行訊息的接收操作了,也就是Android客戶端。

1、Android 依賴新增

在 Android 工程中找到 app 下的配置檔案 build.gradle。新增依賴如下。

dependencies{
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' 
compile group: 'commons-codec', name: 'commons-codec', version: '1.5'
}

2、新增許可權

<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

<uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.INTERNET" />

3、註冊service

<service android:name=".mqtt.service.MqttService" />

同時,這裡還有Android demo,但是demo是java的程式碼,並不能在Android機子上跑起來。

注意,對於第一次對接MQTT的開發人員來說,總是會去看一下接入示例,下載示例的demo執行跑一遍,但是這裡有一個坑,阿里雲的MQTT Android接入示例並不是Android客戶端。附上圖片:

Android客戶端github地址:

以上的官方Android接入示例對於首次接入MQTT的人來說,一般都會去下載這個demo進行執行,但是這不是Android的客戶端,客戶端就是以上的github地址。附上圖片標明官方文件的Android客戶端位置:

三、使用Android客戶端

在github上下載下來的專案包含三個檔案,如圖所示:

1、paho.mqtt.android.example 這個是我們需要使用到的接收訊息的demo

2、org.eclipse.paho.android.service 這個是我們接收使用到的service的  library

3、org.eclipse.paho.android.sample 這個是模擬傳送的demo(這裡我沒有使用,而是使用後臺的傳送程式碼或者官網上生產者那裡的傳送進行傳送訊息,就不做說明了)

四、官方Android客戶端demo的使用

在官方的demo中,我在Android studio上運行了一遍,是可以安裝,但是在你使用了自己的伺服器地址之後,總是會連線不上,不斷處於重連的情況。

下面貼出官方的對接程式碼:

public class PahoExampleActivity1 extends AppCompatActivity{
    private HistoryAdapter mAdapter;

    MqttAndroidClient mqttAndroidClient;

    final String serverUri = "tcp://iot.eclipse.org:1883";

    String clientId = "AndroidExampleClient";
    final String subscriptionTopic = "exampleAndroidTopic";
    final String publishTopic = "exampleAndroidPublishTopic";
    final String publishMessage = "Hello World!";


    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_scrolling);
        Toolbar toolbar = (Toolbar) findViewById(R.id.toolbar);
        setSupportActionBar(toolbar);

        FloatingActionButton fab = (FloatingActionButton) findViewById(R.id.fab);
        fab.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                publishMessage();
            }
        });


        RecyclerView mRecyclerView = (RecyclerView) findViewById(R.id.history_recycler_view);
        RecyclerView.LayoutManager mLayoutManager = new LinearLayoutManager(this);
        mRecyclerView.setLayoutManager(mLayoutManager);

        mAdapter = new HistoryAdapter(new ArrayList<String>());
        mRecyclerView.setAdapter(mAdapter);

        clientId = clientId + System.currentTimeMillis();

        mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), serverUri, clientId);
        mqttAndroidClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {

                if (reconnect) {
                    addToHistory("Reconnected to : " + serverURI);
                    // Because Clean Session is true, we need to re-subscribe
                    subscribeToTopic();
                } else {
                    addToHistory("Connected to: " + serverURI);
                }
            }

            @Override
            public void connectionLost(Throwable cause) {
                addToHistory("The Connection was lost.");
                Log.e("TAG","失敗資訊"+cause.toString());
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                addToHistory("Incoming message: " + new String(message.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {

            }
        });

        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setCleanSession(false);
        

        try {
            //addToHistory("Connecting to " + serverUri);
            mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
                    disconnectedBufferOptions.setBufferEnabled(true);
                    disconnectedBufferOptions.setBufferSize(100);
                    disconnectedBufferOptions.setPersistBuffer(false);
                    disconnectedBufferOptions.setDeleteOldestMessages(false);
                    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                    subscribeToTopic();
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    addToHistory("Failed to connect to: " + serverUri);
                    Log.e("TAG",exception.toString());
                }
            });


        } catch (MqttException ex){
            ex.printStackTrace();
        }

    }

    private void addToHistory(String mainText){
        System.out.println("LOG: " + mainText);
        mAdapter.add(mainText);
        Snackbar.make(findViewById(android.R.id.content), mainText, Snackbar.LENGTH_LONG)
                .setAction("Action", null).show();

    }

    @Override
    public boolean onCreateOptionsMenu(Menu menu) {
        // Inflate the menu; this adds items to the action bar if it is present.

        return true;
    }

    @Override
    public boolean onOptionsItemSelected(MenuItem item) {
        // Handle action bar item clicks here. The action bar will
        // automatically handle clicks on the Home/Up button, so long
        // as you specify a parent activity in AndroidManifest.xml.
        int id = item.getItemId();

        //noinspection SimplifiableIfStatement

        return super.onOptionsItemSelected(item);
    }

    public void subscribeToTopic(){
        try {
            mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    addToHistory("Subscribed!");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    addToHistory("Failed to subscribe");
                    Log.e("TAG",exception.toString());
                }
            });

            // THIS DOES NOT WORK!
            mqttAndroidClient.subscribe(subscriptionTopic, 0, new IMqttMessageListener() {
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    // message Arrived!
                    System.out.println("Message: " + topic + " : " + new String(message.getPayload()));
                }
            });

        } catch (MqttException ex){
            System.err.println("Exception whilst subscribing");
            ex.printStackTrace();
        }
    }

    public void publishMessage(){

        try {
            MqttMessage message = new MqttMessage();
            message.setPayload(publishMessage.getBytes());
            mqttAndroidClient.publish(publishTopic, message);
            addToHistory("Message Published");
            if(!mqttAndroidClient.isConnected()){
                addToHistory(mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
            }
        } catch (MqttException e) {
            System.err.println("Error Publishing: " + e.getMessage());
            e.printStackTrace();
        }
    }

}

使用官方demo給出的serverUri執行demo是可以連線的,但是我們需要的並不僅僅是連線啊,還需要接收訊息,所以我們需把serverUri習慣成我們的serverUri,之後就發現一直連線不上,引數等配置也都沒有問題,也找不出問題出現在哪裡。無奈之下只能讓後臺去先去對接,然而奇妙的解決方案就在此處找到了。

在服務端的demo中,可以進行傳送與接收,想到Android也是使用java語言進行開發的,所以就嘗試使用後臺的接收程式碼去代入Android的接入demo中去嘗試一次看看能不能跑通,但是神奇的,竟然跑通了,但是Android與java還是有區別的,所以不能完全招搬,要取我們需要的部分進行代入。

修改後的程式碼如下:

public class PahoExampleActivity extends AppCompatActivity{
    private HistoryAdapter mAdapter;

    MqttAndroidClient mqttAndroidClient;

//    final String serverUri = "tcp://iot.eclipse.org:1883";
//
//    String clientId = "AndroidExampleClient";
//    final String subscriptionTopic = "exampleAndroidTopic";
//    final String publishTopic = "exampleAndroidPublishTopic";
//    final String publishMessage = "Hello World!";

    final String serverUri = "tcp://xxx:1883";
    final String groupId = "xxx";
    final String topic = "xxx";
    final int qosLevel = 0;
    final Boolean cleanSession =true;
    String clientId = groupId + "@@@RECV0001";
    String accessKey = "xxx";
    String secretKey = "xxx";
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_scrolling);
        Toolbar toolbar = (Toolbar) findViewById(R.id.toolbar);
        setSupportActionBar(toolbar);

        FloatingActionButton fab = (FloatingActionButton) findViewById(R.id.fab);
        fab.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                publishMessage();
            }
        });


        RecyclerView mRecyclerView = (RecyclerView) findViewById(R.id.history_recycler_view);
        RecyclerView.LayoutManager mLayoutManager = new LinearLayoutManager(this);
        mRecyclerView.setLayoutManager(mLayoutManager);

        mAdapter = new HistoryAdapter(new ArrayList<String>());
        mRecyclerView.setAdapter(mAdapter);

        clientId = clientId + System.currentTimeMillis();
        mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), serverUri, clientId);
        mqttAndroidClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {

                if (reconnect) {
                    addToHistory("Reconnected to : " + serverURI);
                    // Because Clean Session is true, we need to re-subscribe
                    subscribeToTopic();
                } else {
                    addToHistory("Connected to: " + serverURI);
                }
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable cause) {
//                addToHistory("The Connection was lost.");
                Log.e("TAG","失敗資訊"+cause.toString());
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                Log.e("TAG","接收的訊息Incoming message: " + new String(message.getPayload()));
                addToHistory("接收的訊息Incoming message: " + new String(message.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {

            }
        });

        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setCleanSession(false);
        String sign = null;
        try {
            sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
        } catch (InvalidKeyException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
        mqttConnectOptions.setUserName(accessKey);
        mqttConnectOptions.setPassword(sign.toCharArray());
        mqttConnectOptions.setCleanSession(cleanSession);
        mqttConnectOptions.setKeepAliveInterval(90);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);




        try {
            //addToHistory("Connecting to " + serverUri);
            mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
                    disconnectedBufferOptions.setBufferEnabled(true);
                    disconnectedBufferOptions.setBufferSize(100);
                    disconnectedBufferOptions.setPersistBuffer(false);
                    disconnectedBufferOptions.setDeleteOldestMessages(false);
                    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                    subscribeToTopic();
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    addToHistory("Failed to connect to: " + serverUri);
                    Log.e("TAG",exception.toString());
                }
            });


        } catch (MqttException ex){
            ex.printStackTrace();
        }

    }

    private void addToHistory(String mainText){
        System.out.println("LOG: " + mainText);
        mAdapter.add(mainText);
        Snackbar.make(findViewById(android.R.id.content), mainText, Snackbar.LENGTH_LONG)
                .setAction("Action", null).show();

    }

    @Override
    public boolean onCreateOptionsMenu(Menu menu) {
        // Inflate the menu; this adds items to the action bar if it is present.

        return true;
    }

    @Override
    public boolean onOptionsItemSelected(MenuItem item) {
        // Handle action bar item clicks here. The action bar will
        // automatically handle clicks on the Home/Up button, so long
        // as you specify a parent activity in AndroidManifest.xml.
        int id = item.getItemId();

        //noinspection SimplifiableIfStatement

        return super.onOptionsItemSelected(item);
    }

    public void subscribeToTopic(){
        try {
            mqttAndroidClient.subscribe(topic, 0, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    addToHistory("Subscribed!");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    addToHistory("Failed to subscribe");
                    Log.e("TAG",exception.toString());
                }
            });

            // THIS DOES NOT WORK!
            mqttAndroidClient.subscribe(topic, 0, new IMqttMessageListener() {
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    // message Arrived!
                    System.out.println("Message: " + topic + " : " + new String(message.getPayload()));
                    addToHistory("Message: " + topic + " : " + new String(message.getPayload()));
                }
            });

        } catch (MqttException ex){
            System.err.println("Exception whilst subscribing");
            ex.printStackTrace();
        }
    }

    public void publishMessage(){

        try {
            MqttMessage message = new MqttMessage();
            message.setPayload(topic.getBytes());
            mqttAndroidClient.publish(topic, message);
            addToHistory("Message Published");
            if(!mqttAndroidClient.isConnected()){
                Log.e("TAG","接收");
                addToHistory(mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
            }
        } catch (MqttException e) {
            System.err.println("Error Publishing: " + e.getMessage());
            e.printStackTrace();
        }
    }

}

下面對修改後的引數進行說明一下:

1、serverUri 伺服器的域名加埠 例如:tcp://xxx:1883 因為不能洩露一些資料,所以使用xxx代替了,具體的地址則是在MQTT管理----》例項管理下的接入點域名,複製的接入點域名即是xxx了,注意,tcp還是需要的哦,接入點域名只是xxx的那一部分。

2、groupId  groupId可在 MQTT管理----》Group Id管理中獲取到,這個id是需要自己建立的。

3、topic 主題(注意:主題是需要一致的)

4、clientId 客戶端id,clientId 是由groupId [email protected]@@+裝置名。裝置名可以隨便取,但是客戶端id有長度限制,具體長度限制可檢視官方文件。

5、accessKey 與secretKey  這兩也是需要自己建立,在建立完成後需要儲存好。

以上程式碼能進行接收的主要區別是以下這段程式碼:

String sign = null;
try {
    sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
} catch (InvalidKeyException e) {
    e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
    e.printStackTrace();
}
mqttConnectOptions.setUserName(accessKey);
mqttConnectOptions.setPassword(sign.toCharArray());
mqttConnectOptions.setCleanSession(cleanSession);
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);

如果把這段程式碼去掉,也是會連線不上,會不斷進行重連。

修改後的接收程式碼是可以進行接收了,但是有出現了另外一個問題。之前不是說使用服務端的的接收程式碼是可以在java的main函式中直接進行接收與傳送的。但是使用以上程式碼進行接收之後,服務端的傳送程式碼在Android上是接收不到的,但是ios就可以接收的了訊息。不過雖然後臺的傳送程式碼傳送的訊息是接收不到,但是很奇怪的是直接在阿里雲上的生產者那裡進行訊息的傳送是可以進行訊息的接收的,暫時也沒找到問題的所在,所以就直接讓後臺使用生產者的傳送示例程式碼進行訊息的傳送,這樣就解決了,同時ios與Android都能同時接收到訊息了。生產者的傳送示例程式碼我就不貼出來了,可以在MQTT控制檯上找到,如圖所示:

這樣就實現了MQTT的對接了,需要運用到我們的專案中,所以我對程式碼進行了一下整理:

final String serverUri = "tcp://post-cn-mp90v8ybm08.mqtt.aliyuncs.com:1883";
final String groupId = "GID_fengniao";
final String topic = "test_test123";
final int qosLevel = 0;
final Boolean cleanSession =true;
private String clientId = groupId + "@@@RECV0001";
private String accessKey = "LTAIXJ4HbeIgsCFi";
private String secretKey = "QdayGSk6FcJr3MvH0roT2MymUlxoW6";
private MqttAndroidClient mqttAndroidClient;
public void startMQTTService(){
    clientId = clientId + System.currentTimeMillis();
    mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), serverUri, clientId);
    mqttAndroidClient.setCallback(new MqttCallbackExtended() {
        @Override
        public void connectComplete(boolean reconnect, String serverURI) {

            if (reconnect) {
                LogUtils.e("Reconnected to : " + serverURI);
                // Because Clean Session is true, we need to re-subscribe
                subscribeToTopic();
            } else {
                LogUtils.e("Connected to: " + serverURI);
            }
        }

        @Override
        public void connectionLost(Throwable cause) {
            LogUtils.e("The Connection was lost.");
            LogUtils.e("MQTT失敗資訊-->"+cause.toString());
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            LogUtils.e("接收的訊息Incoming message: " + new String(message.getPayload()));

        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {

        }
    });

    MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    mqttConnectOptions.setAutomaticReconnect(true);
    mqttConnectOptions.setCleanSession(false);
    String sign = null;
    try {
        sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
    } catch (InvalidKeyException e) {
        e.printStackTrace();
    } catch (NoSuchAlgorithmException e) {
        e.printStackTrace();
    }
    mqttConnectOptions.setUserName(accessKey);
    mqttConnectOptions.setPassword(sign.toCharArray());
    mqttConnectOptions.setCleanSession(cleanSession);
    mqttConnectOptions.setKeepAliveInterval(90);
    mqttConnectOptions.setAutomaticReconnect(true);
    mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);




    try {
        //addToHistory("Connecting to " + serverUri);
        mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
            @Override
            public void onSuccess(IMqttToken asyncActionToken) {
                DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
                disconnectedBufferOptions.setBufferEnabled(true);
                disconnectedBufferOptions.setBufferSize(100);
                disconnectedBufferOptions.setPersistBuffer(false);
                disconnectedBufferOptions.setDeleteOldestMessages(false);
                mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                subscribeToTopic();
            }

            @Override
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                LogUtils.e("Failed to connect to: " + serverUri);
                LogUtils.e(exception.toString());
            }
        });


    } catch (MqttException ex){
        ex.printStackTrace();
    }
}

public void subscribeToTopic(){
    try {
        mqttAndroidClient.subscribe(topic, 0, null, new IMqttActionListener() {
            @Override
            public void onSuccess(IMqttToken asyncActionToken) {
                Log.e("TAG","Subscribed!");
            }

            @Override
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                LogUtils.e("Failed to subscribe");
                LogUtils.e(exception.toString());
            }
        });

        // THIS DOES NOT WORK!
        mqttAndroidClient.subscribe(topic, 0, new IMqttMessageListener() {
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                // message Arrived!
                LogUtils.e("Message: " + topic + " : " + new String(message.getPayload()));
                String money= new String(message.getPayload());
                if (!TextUtils.isEmpty(money)) {
                    if (OENType.currentType() == OENType.fengniao) {
                        VoiceSpeaker.getInstance().speak(VoiceTemplate.getDefaultTemplate(money));
                    }
                }
            }
        });

    } catch (MqttException ex){
        LogUtils.e("Exception whilst subscribing");
        ex.printStackTrace();
    }
}

啟動服務則只有在onCreate()方法中呼叫startMQTTService()方法即可。

五、org.eclipse.paho.android.service的使用方式注意事項

1、使用匯入library的方式

2、直接複製程式碼的方式

如果使用了方式2,則需要注意在MqttAndroidClient這個類中把自己的服務的名字改成自己的MqttService這個java類的位置,如圖所示:

這樣,MQTT的Android就完成了,接下來就是實現自己的播報部分了。

因為阿里雲的MQTT在網上沒有多少文件,所以特地寫了下總結,希望對初次接入MQTT的同行有幫助。