MQTT Java客戶端Eclipse paho實現資料的傳送和接收
阿新 • • 發佈:2018-12-09
MQTT(MQ Telemetry Transport)是IBM開發的一種網路應用層的協議
使用場景:
1、不可靠、網路頻寬小的網路
2、執行的裝置CPU、記憶體非常有限
特點:
1、基於釋出/訂閱模型的協議
2、他是二進位制協議,二進位制的特點就是緊湊、佔用空間小。他的協議頭只有2個位元組
3、提供了三種訊息可能性保障:最多一次 0、最少一次 1、只有一次 2
maven依賴
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.1.1</version> </dependency>
傳送訊息示例
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * 傳送資料到mqtt伺服器 * @author:塗有 * @date 2017年8月16日 下午11:15:22 */ public class PubMsg { private static int qos = 2; //只有一次 private static String broker = "tcp://10.100.124.206:1883"; private static String userName = "tuyou"; private static String passWord = "tuyou"; private static MqttClient connect(String clientId,String userName, String password) throws MqttException { MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setUserName(userName); connOpts.setPassword(password.toCharArray()); connOpts.setConnectionTimeout(10); connOpts.setKeepAliveInterval(20); // String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.207:1883"}; // connOpts.setServerURIs(uris); //起到負載均衡和高可用的作用 MqttClient mqttClient = new MqttClient(broker, clientId, persistence); mqttClient.setCallback(new PushCallback("test")); mqttClient.connect(connOpts); return mqttClient; } private static void pub(MqttClient sampleClient, String msg,String topic) throws MqttPersistenceException, MqttException { MqttMessage message = new MqttMessage("ertwersdfas".getBytes()); message.setQos(qos); message.setRetained(false); sampleClient.publish(topic, message); } private static void publish(String str,String clientId,String topic) throws MqttException{ MqttClient mqttClient = connect(clientId,userName,passWord); if (mqttClient != null) { pub(mqttClient, str, topic); System.out.println("pub-->" + str); } if (mqttClient != null) { mqttClient.disconnect(); } } public static void main(String[] args) throws MqttException { publish("message content","client-id-0","$share/edge/server/public/a"); } } class PushCallback implements MqttCallback { private String threadId; public PushCallback(String threadId){ this.threadId = threadId; } public void connectionLost(Throwable cause) { } public void deliveryComplete(IMqttDeliveryToken token) { // System.out.println("deliveryComplete---------" + token.isComplete()); } public void messageArrived(String topic, MqttMessage message) throws Exception { String msg = new String(message.getPayload()); System.out.println(threadId + " " + msg); } }
消費訊息示例
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class SubMsg { // private static String topic = "$share/testgroup/wyptest1"; // private static String topic = "$queue/wyptest1"; // private static String topic = "wyptest1"; private static int qos = 2; private static String broker = "tcp://10.100.124.207:1883"; private static String userName = "tuyou"; private static String passWord = "tuyou"; private static MqttClient connect(String clientId) throws MqttException{ MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions connOpts = new MqttConnectOptions(); // String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.206:1883"}; connOpts.setCleanSession(false); connOpts.setUserName(userName); connOpts.setPassword(passWord.toCharArray()); connOpts.setConnectionTimeout(10); connOpts.setKeepAliveInterval(20); // connOpts.setServerURIs(uris); // connOpts.setWill(topic, "close".getBytes(), 2, true); MqttClient mqttClient = new MqttClient(broker, clientId, persistence); mqttClient.connect(connOpts); return mqttClient; } public static void sub(MqttClient mqttClient,String topic) throws MqttException{ int[] Qos = {qos}; String[] topics = {topic}; mqttClient.subscribe(topics, Qos); } private static void runsub(String clientId, String topic) throws MqttException{ MqttClient mqttClient = connect(clientId); if(mqttClient != null){ sub(mqttClient,topic); } } public static void main(String[] args) throws MqttException{ runsub("client-id-1", "$share/testgroupa/edge/server/private/+"); } }