1. 程式人生 > >springboot如何整合mqtt訊息推送

springboot如何整合mqtt訊息推送

1.需求分析

    近期筆者專案需要用到mqtt實現訊息推送,筆者選擇emq作為mqtt伺服器載體,上篇筆者講解了如何在linux中安裝mqtt服務,安裝連結:https://blog.csdn.net/zhangxing52077/article/details/80567270,接下來筆者將講解如何在springboot中整合mqtt

2.實現方案

①pom依賴

<!--mqtt-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-integration</artifactId>
</dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId>
</dependency>

②yml中配置mqtt(自定義配置)

#mq配置
com:
  mqtt:
    host: tcp://ip:1883
    clientid: mqttjs_e8022a4d0b
    topic: good,test,yes
    username: zhangxing
    password: zxp52077
    timeout: 10
    keepalive: 20

③建立mqtt訊息屬性配置類

@Component
@ConfigurationProperties(prefix = "com.mqtt")
@Setter
@Getter
public class 
MqttConfiguration { private String host; private String clientid; private String topic; private String username; private String password; private int timeout; private int keepalive; }

④建立mqtt訊息推送實體

@Slf4j
@Setter
@Getter
public class PushPayload {
    //推送型別
private String type;
//推送物件
private String mobile;
//標題
private String title;
//內容
private String content;
//數量
private Integer badge = 1;
//鈴聲
private String sound = "default";
    public PushPayload(String type, String mobile, String title, String content, Integer badge , String sound){
        this.type = type;
        this.mobile = mobile;
        this.title = title;
        this.content = content;
        this.badge = badge;
        this.sound = sound;
}

    public static class Builder{
        //推送型別
private String type;
//推送物件
private String mobile;
//標題
private String title;
//內容
private String content;
//數量
private Integer badge = 1;
//鈴聲
private String sound = "default";
        public Builder setType(String type) {
            this.type = type;
            return this;
}

        public Builder setMobile(String mobile) {
            this.mobile = mobile;
            return this;
}

        public Builder setTitle(String title) {
            this.title = title;
            return this;
}

        public Builder setContent(String content) {
            this.content = content;
            return this;
}

        public Builder setBadge(Integer badge) {
            this.badge = badge;
            return this;
}

        public Builder setSound(String sound) {
            this.sound = sound;
            return this;
}

        public PushPayload bulid(){
           return new PushPayload(type,mobile,title,content,badge,sound);
}
    }


    public static Builder getPushPayloadBuider(){
        return new Builder();
}


    @Override
public String toString() {
        return JSON.toJSONString(this, SerializerFeature.DisableCircularReferenceDetect);
}

  
}

⑤建立mqtt訊息推送或訂閱客戶端

@Slf4j
public class MqttPushClient {

    private MqttClient client;
    private static volatile MqttPushClient mqttPushClient = null;
    public static MqttPushClient getInstance(){

        if(null == mqttPushClient){
            synchronized (MqttPushClient.class){
                if(null == mqttPushClient){
                    mqttPushClient = new MqttPushClient();
}
            }

        }
        return mqttPushClient;
}

    private MqttPushClient() {
        connect();
}

    private void connect(){
        try {
            client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENTID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(PropertiesUtil.MQTT_USER_NAME);
options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());
options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);
options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
            try {
                client.setCallback(new PushCallback());
client.connect(options);
} catch (Exception e) {
                e.printStackTrace();
}
        } catch (Exception e) {
            e.printStackTrace();
}
    }

    /**
     * 釋出,預設qos0,非持久化
* @param topic
* @param pushMessage
*/
public void publish(String topic,PushPayload pushMessage){
        publish(0, false, topic, pushMessage);
}

    /**
     * 釋出
* @param qos
* @param retained
* @param topic
* @param pushMessage
*/
public void publish(int qos,boolean retained,String topic,PushPayload pushMessage){
        MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.toString().getBytes());
MqttTopic mTopic = client.getTopic(topic);
        if(null == mTopic){
            log.error("topic not exist");
}
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
            e.printStackTrace();
} catch (MqttException e) {
            e.printStackTrace();
}
    }

    /**
     * 訂閱某個主題,qos預設為0
     * @param topic
*/
public void subscribe(String topic){
        subscribe(topic,0);
}

    /**
     * 訂閱某個主題
* @param topic
* @param qos
*/
public void subscribe(String topic,int qos){
        try {
            client.subscribe(topic, qos);
} catch (MqttException e) {
            e.printStackTrace();
}
    }


    public static void main(String[] args) throws Exception {
        String kdTopic = "good";
PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("15345715326")
                 .setContent("designModel")
                .bulid();
MqttPushClient.getInstance().publish(0, false, kdTopic, pushMessage);}
}

⑥配置獲取類的編寫

public class PropertiesUtil {

   public static String MQTT_HOST;
   public static String MQTT_CLIENTID;
   public static String MQTT_USER_NAME;
   public static String MQTT_PASSWORD;
   public static int MQTT_TIMEOUT;
   public static int MQTT_KEEP_ALIVE;
   public static final String ELASTIC_SEARCH_HOST;
   public static final int ELASTIC_SEARCH_PORT;
   public static final String ELASTIC_SEARCH_CLUSTER_NAME;
   static {
      MQTT_HOST = loadMqttProperties().getProperty("MQTT_HOST");
MQTT_CLIENTID = loadMqttProperties().getProperty("MQTT_CLIENTID");
MQTT_USER_NAME = loadMqttProperties().getProperty("MQTT_USER_NAME");
MQTT_PASSWORD = loadMqttProperties().getProperty("MQTT_PASSWORD");
MQTT_TIMEOUT = Integer.valueOf(loadMqttProperties().getProperty("MQTT_TIMEOUT"));
MQTT_KEEP_ALIVE = Integer.valueOf(loadMqttProperties().getProperty("MQTT_KEEP_ALIVE"));
}

   static {
      ELASTIC_SEARCH_HOST = loadEsProperties().getProperty("ES_HOST");
ELASTIC_SEARCH_PORT = Integer.valueOf(loadEsProperties().getProperty("ES_PORT"));
ELASTIC_SEARCH_CLUSTER_NAME = loadEsProperties().getProperty("ES_CLUSTER_NAME");
}

   private static Properties loadMqttProperties() {
      InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/mqtt.yml");
Properties properties = new Properties();
      try {
         properties.load(inputstream);
         return properties;
} catch (IOException e) {
         throw new RuntimeException(e);
} finally {
         try {
            if (inputstream != null) {
               inputstream.close();
}
         } catch (IOException e) {
            throw new RuntimeException(e);
}
      }
   }

   private static Properties loadEsProperties() {
      InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/elasticsearch.properties");
Properties properties = new Properties();
      try {
         properties.load(inputstream);
         return properties;
} catch (IOException e) {
         throw new RuntimeException(e);
} finally {
         try {
            if (inputstream != null) {
               inputstream.close();
}
         } catch (IOException e) {
            throw new RuntimeException(e);
}
      }
   }


}

⑦mqtt推送回調類

/**
 * @auther zx
 * @date 2018/5/28 9:20
 */
public class PushCallback implements MqttCallback {

    public void connectionLost(Throwable cause) {
        // 連線丟失後,一般在這裡面進行重連
System.out.println("連線斷開,可以做重連");
}

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
}

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe後得到的訊息會執行到這裡面
System.out.println("接收訊息主題 : " + topic);
System.out.println("接收訊息Qos : " + message.getQos());
System.out.println("接收訊息內容 : " + new String(message.getPayload()));
}
}

3.效果測試

@Test
public void test() {

   PushPayload pushPayload = PushPayload.getPushPayloadBuider().setContent("test")
         .setMobile("119")
         .setType("2018")
         .bulid();
mqttClientComponent.push("yes",pushPayload);
}

mqtt客戶端效果顯示


好了,至此mqtt的整合已完結

我是張星,歡迎加入博主技術交流群,群號:313145288