1. 程式人生 > >msquitto在java中的應用,一個小Demo

msquitto在java中的應用,一個小Demo

訊息釋出者

package com.gaofei.utils;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
 * 釋出服務
 * @author gaofei
 *
 */
public class ServerMQTT {//服務端MQTT
	 //tcp://MQTT安裝的伺服器地址:MQTT定義的埠號  
    public static final String HOST = "tcp://localhost:1883";  
    //定義一個主題  
    public static final String TOPIC = "pos_message_all";  
    //定義MQTT的ID,可以在MQTT服務配置中指定  
    private static final String clientid = "server11";  

    private MqttClient client;  //定義MQTT釋出者客戶端物件
    private MqttTopic topic11;  //釋出主題
    private String userName = "admin";  //非必須-使用者名稱
    private String passWord = "123456";  //非必須-密碼

    private MqttMessage message;//釋出者釋出的訊息

	public ServerMQTT() throws MqttException {
		  // 初始化與伺服器連線,Url/cliendId/MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存  
		client=new MqttClient(HOST,clientid,new MemoryPersistence());
		connect();
	}  
	private void connect() {  
        MqttConnectOptions options = new MqttConnectOptions(); //連線伺服器支援的選項設定 
        options.setCleanSession(false);//每次會話完畢session儲存  
        options.setUserName(userName);  
        options.setPassword(passWord.toCharArray());  
        // 設定超時時間  
        options.setConnectionTimeout(10);  
        // 設定會話心跳時間  
        options.setKeepAliveInterval(20); //時間為 20*1.5s傳送一次
        try {  
            client.setCallback(new PushCallback());//處理髮布的訊息  
            client.connect(options);  //客戶端連線上

            topic11 = client.getTopic(TOPIC);  //獲取主題
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    } 
	   /** 
     * 釋出訊息
     * @param topic 
     * @param message 
     * @throws MqttPersistenceException 
     * @throws MqttException 
     */  
    public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,  
            MqttException {  
        MqttDeliveryToken token = topic.publish(message); //釋出訊息 
        token.waitForCompletion(); //等待完成 
        System.out.println("message is published completely! "  
                + token.isComplete());  
    }  
    /** 
     *  啟動入口 
     * @param args 
     * @throws MqttException 
     */  
    public static void main(String[] args) throws MqttException {  
        ServerMQTT server = new ServerMQTT();//建立MQTT釋出客戶端物件  
        server.message = new MqttMessage();  //建立Mqqt訊息物件
        server.message.setQos(1);  //設定服務質量QOS:1保證訊息至少能到達一次
        server.message.setRetained(true);  
        server.message.setPayload("這是推送訊息的內容".getBytes());  
        server.publish(server.topic11 , server.message);  
        System.out.println(server.message.isRetained() + "------ratained狀態");  
    }  
}

訊息訂閱者

package com.gaofei.utils;

import java.util.concurrent.ScheduledExecutorService;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 訂閱服務
 * 模擬一個客戶端訂閱服務
 * @author rao
 *
 */
public class ClientMQTT {
	  public static final String HOST = "tcp://localhost:1883";  
	    public static final String TOPIC1 = "pos_message_all";  
	    private static final String clientid = "client11";  
	    private MqttClient client;  
	    private MqttConnectOptions options;  
	    private String userName = "admin";    //非必須
	    private String passWord = "123456";  //非必須
	    @SuppressWarnings("unused")
	    private ScheduledExecutorService scheduler;  

	    private void start() {  
	        try {  
	            // host為主機名,clientid即連線MQTT的客戶端ID,一般以唯一識別符號表示,MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存  
	            client = new MqttClient(HOST, clientid, new MemoryPersistence());  
	            // MQTT的連線設定  
	            options = new MqttConnectOptions();  
	            // 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,設定為true表示每次連線到伺服器都以新的身份連線  
	            options.setCleanSession(false);  
	            // 設定連線的使用者名稱  
	            options.setUserName(userName);  
	            // 設定連線的密碼  
	            options.setPassword(passWord.toCharArray());  
	            // 設定超時時間 單位為秒  
	            options.setConnectionTimeout(10);  
	            // 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制  
	            options.setKeepAliveInterval(20);  
	            // 設定回撥  
	            client.setCallback(new PushCallback());  
	            MqttTopic topic = client.getTopic(TOPIC1);  
	            //setWill方法,如果專案中需要知道客戶端是否掉線可以呼叫該方法。設定最終埠的通知訊息  
	            //遺囑       
	            options.setWill(topic, "close".getBytes(), 2, true);  
	            client.connect(options);  
	            //訂閱訊息  
	            int[] Qos  = {1};  
	            String[] topic1 = {TOPIC1};  
	            client.subscribe(topic1, Qos);  //訂閱主題

	        } catch (Exception e) {  
	            e.printStackTrace();  
	        }  
	    }  

	    public static void main(String[] args) throws MqttException {  
	        ClientMQTT client = new ClientMQTT();  
	        client.start();  
	    }  

}

訊息的處理

package com.gaofei.utils;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PushCallback implements MqttCallback {

	@Override
	public void connectionLost(Throwable arg0) {
		  // 連線丟失後,一般在這裡面進行重連  
        System.out.println("連線斷開,可以做重連");  
		
	}

	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {//訊息傳送完
		 System.out.println("deliveryComplete---------" + token.isComplete()); 
		
	}

	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		 // subscribe後得到的訊息會執行到這裡面  
        System.out.println("接收訊息主題 : " + topic);  
        System.out.println("接收訊息Qos : " + message.getQos());  
        System.out.println("接收訊息內容 : " + new String(message.getPayload()));

		
	}

}