1. 程式人生 > >MQTT Java客戶端Eclipse paho實現資料的傳送和接收

MQTT Java客戶端Eclipse paho實現資料的傳送和接收

MQTT(MQ Telemetry Transport)是IBM開發的一種網路應用層的協議

使用場景:

1、不可靠、網路頻寬小的網路

2、執行的裝置CPU、記憶體非常有限

特點:

1、基於釋出/訂閱模型的協議

2、他是二進位制協議,二進位制的特點就是緊湊、佔用空間小。他的協議頭只有2個位元組

3、提供了三種訊息可能性保障:最多一次 0、最少一次 1、只有一次 2

maven依賴

<dependency>
	<groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.1.1</version>
</dependency>

傳送訊息示例

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
/**
 * 傳送資料到mqtt伺服器
 * @author:塗有
 * @date 2017年8月16日 下午11:15:22
 */
public class PubMsg {
	private static int qos = 2; //只有一次
	private static String broker = "tcp://10.100.124.206:1883";
	private static String userName = "tuyou";
	private static String passWord = "tuyou";
 
	
	private static MqttClient connect(String clientId,String userName,
			String password) throws MqttException {
		MemoryPersistence persistence = new MemoryPersistence();
		MqttConnectOptions connOpts = new MqttConnectOptions();
		connOpts.setCleanSession(true);
		connOpts.setUserName(userName);
		connOpts.setPassword(password.toCharArray());
		connOpts.setConnectionTimeout(10);
		connOpts.setKeepAliveInterval(20);
//		String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.207:1883"};
//		connOpts.setServerURIs(uris);  //起到負載均衡和高可用的作用
		MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
		mqttClient.setCallback(new PushCallback("test"));
		mqttClient.connect(connOpts);
		return mqttClient;
	}
 
	private static void pub(MqttClient sampleClient, String msg,String topic) 
			throws MqttPersistenceException, MqttException {
		MqttMessage message = new MqttMessage("ertwersdfas".getBytes());
		message.setQos(qos);
		message.setRetained(false);
		sampleClient.publish(topic, message);
	}
	
	private static void publish(String str,String clientId,String topic) throws MqttException{
		MqttClient mqttClient = connect(clientId,userName,passWord);
 
		if (mqttClient != null) {
			pub(mqttClient, str, topic);
			System.out.println("pub-->" + str);
		}
 
		if (mqttClient != null) {
			mqttClient.disconnect();
		}
	}
 
	public static void main(String[] args) throws MqttException {
		publish("message content","client-id-0","$share/edge/server/public/a");
	}
}
 
class PushCallback implements MqttCallback {
	private String threadId;
	public PushCallback(String threadId){
		this.threadId = threadId;
	}
	
    public void connectionLost(Throwable cause) {
    	
    }
 
    public void deliveryComplete(IMqttDeliveryToken token) {
//       System.out.println("deliveryComplete---------" + token.isComplete());
    }
 
    public void messageArrived(String topic, MqttMessage message) throws Exception {
    	String msg = new String(message.getPayload());
    	System.out.println(threadId + " " + msg);
    }
}

 消費訊息示例

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
public class SubMsg {
 
//	 private static String topic = "$share/testgroup/wyptest1";
//	 private static String topic = "$queue/wyptest1";
//	 private static String topic = "wyptest1";
     private static int qos = 2;
     private static String broker = "tcp://10.100.124.207:1883";
     private static String userName = "tuyou";
     private static String passWord = "tuyou";
    
 
     private static MqttClient connect(String clientId) throws MqttException{
    	 MemoryPersistence persistence = new MemoryPersistence();
    	 MqttConnectOptions connOpts = new MqttConnectOptions();
//    	 String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.206:1883"};
    	 connOpts.setCleanSession(false);
         connOpts.setUserName(userName);
         connOpts.setPassword(passWord.toCharArray());
         connOpts.setConnectionTimeout(10);
         connOpts.setKeepAliveInterval(20);
//         connOpts.setServerURIs(uris);
//         connOpts.setWill(topic, "close".getBytes(), 2, true);
         MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
         mqttClient.connect(connOpts);
    	 return mqttClient;
     }
     
     public static void sub(MqttClient mqttClient,String topic) throws MqttException{
         int[] Qos  = {qos};
         String[] topics = {topic};
         mqttClient.subscribe(topics, Qos);
     }
     
     
    private static void runsub(String clientId, String topic) throws MqttException{
    	MqttClient mqttClient = connect(clientId);
    	if(mqttClient != null){
			sub(mqttClient,topic);
    	}
    }
    public static void main(String[] args) throws MqttException{
    	
		runsub("client-id-1", "$share/testgroupa/edge/server/private/+");
    }
}