1. 程式人生 > >SpringBoot2.0整合MQTT功能之訊息訂閱處理

SpringBoot2.0整合MQTT功能之訊息訂閱處理

       距離上一篇【SpringBoot2.0整合MQTT訊息推送功能】https://blog.csdn.net/qq_41018959/article/details/80592444部落格已經過去有一段時間了,最近比較忙,也沒時間整理【SpringBoot2.0整合MQTT訊息之訊息訂閱處理】篇章,剛好早上閒下來,就趁這個機會整理一下。

       網上資料還是蠻多的,但也不是很全面,比如如何設定多個client,如何監聽不同topic等,好了,廢話不多說,還是跟上篇一樣的環境,上程式碼:

        第一,pom配置,引入相關jar:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-mqtt</artifactId>
</dependency>    

第二,配置MQTT伺服器基本資訊,在springBoot配置檔案application.properties中配置,新增如下:

#MQTT配置資訊
#MQTT-使用者名稱
spring.mqtt.username=admin
#MQTT-密碼
spring.mqtt.password=password
#MQTT-伺服器連線地址,如果有多個,用逗號隔開,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
spring.mqtt.url=tcp://127.0.0.1:61613
#MQTT-連線伺服器預設客戶端ID
spring.mqtt.client.id=mqttId
#MQTT-預設的訊息推送主題,實際可在呼叫介面時指定
spring.mqtt.default.topic=topic
#連線超時
spring.mqtt.completionTimeout=3000

第三,配置MQTT訊息接收處理類:

/**
 * 〈一句話功能簡述〉<br> 
 * 〈MQTT接收訊息處理〉
 *
 * @author lenovo
 * @create 2018/6/4
 * @since 1.0.0
 */
@Configuration
@IntegrationComponentScan
public class MqttReceiveConfig {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;

    @Value("${spring.mqtt.completionTimeout}")
    private int completionTimeout ;   //連線超時


    @Bean
    public MqttConnectOptions getMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(2);
        return mqttConnectOptions;
    }
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    //接收通道
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    //配置client,監聽的topic 
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),
                        "hello","hello1");
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    //通過通道獲取資料
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
                if("hello".equalsIgnoreCase(topic)){
                    System.out.println("hello,fuckXX,"+message.getPayload().toString());
                }else if("hello1".equalsIgnoreCase(topic)){
                    System.out.println("hello1,fuckXX,"+message.getPayload().toString());
                }
            }
        };
    }
}

第四,啟動服務測試,使用postment呼叫上一篇的MQTT傳送介面,分別往hello,hello1兩個topic傳送訊息,測試接收情況:

        由此看出,可以正常監聽topic並接收處理訊息了。

        看到這裡,朋友們可能有疑問,如果我要配置多個client,應該怎麼處理呢?這個也簡單,我們只要配置多個通道即可,簡單程式碼如下:

//通道2
@Bean
public MessageChannel mqttInputChannelTwo() {
    return new DirectChannel();
}
//配置client2,監聽的topic:hell2,hello3
@Bean
public MessageProducer inbound1() {
    MqttPahoMessageDrivenChannelAdapter adapter =
            new MqttPahoMessageDrivenChannelAdapter(clientId+"_inboundTwo", mqttClientFactory(),
                    "hello2","hello3");
    adapter.setCompletionTimeout(completionTimeout);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    adapter.setOutputChannel(mqttInputChannelTwo());
    return adapter;
}

//通過通道2獲取資料
@Bean
@ServiceActivator(inputChannel = "mqttInputChannelTwo")
public MessageHandler handlerTwo() {
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
            String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
            if("hello2".equalsIgnoreCase(topic)){
                System.out.println("hello2 clientTwo,"+message.getPayload().toString());
            }else if("hello3".equalsIgnoreCase(topic)){
                System.out.println("hello3 clientTwo,"+message.getPayload().toString());
            }
        }
    };
}

        這樣一來,我們就配置了兩個client,client1監聽處理hello、hello1主題訊息,client2監聽處理hello2、hello3主題,測試一下:

         從輸出結果可以看出,我們傳送不同的訊息,分別由不同的client處理。所以,小夥伴,你理解了嗎?

        【轉載請註明出處——大道迷途】