1. 程式人生 > >SpringBoot整合ActiveMq,並將管理介面整合到後臺系統

SpringBoot整合ActiveMq,並將管理介面整合到後臺系統

1:下載安裝

下載地址: http://activemq.apache.org/download.html

開發環境使用Windows版本,執行在自己的主機,防止相互干擾。

解壓縮,點選下圖所示批處理檔案,即可執行。

image.png

預設的管理後臺地址為:ip:8161,可以訪問此網址驗證是否安裝成功。

image.png

管理平臺預設使用者名稱:admin,預設密碼:admin

2:基本介紹

MQ是訊息中介軟體,是一種在分散式系統中應用程式藉以傳遞訊息的媒介,常用的有ActiveMQ,RabbitMQ,Kafka。ActiveMQ是Apache下的開源專案,完全支援JMS1.1和J2EE1.4規範的JMS Provider實現。 

特點: 

1、支援多種語言編寫客戶端 

2、對spring的支援,很容易和spring整合 

3、支援多種傳輸協議:TCP,SSL,NIO,UDP等 

4、支援AJAX 

訊息形式: 

1、點對點(queue) 

2、一對多(topic) 

3:專案整合

配置pom.xml:

         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.14.5</version>
        </dependency>
 

配置全域性配置檔案application-*.yml

#訊息佇列配置
activemq:
    user: admin
    password: admin
    broker-url: tcp://127.0.0.1:61616
    pool:
      enabled: true
      max-connections: 10
 
#可以根據業務,配置多個佇列名
#訊息佇列的 queue 名稱
queueName: publish.queue
#訊息佇列的 topic 名稱
topicName: publish.topic

新增啟動配置ActiveMqConfig.java:

package cc.ahxb.config;

/**
 * Describe
 *
 * @author Gao
 * @date 2018/12/1
 */

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.Queue;
import javax.jms.Topic;


/**
 * @author: Gao
 */
@Configuration
public class ActiveMqConfig {
    @Value("${queueName}")
    private String queueName;

    @Value("${topicName}")
    private String topicName;

    @Value("${spring.activemq.user}")
    private String usrName;

    @Value("${spring.activemq.password}")
    private  String password;

    @Value("${spring.activemq.broker-url}")
    private  String brokerUrl;

    /**
     * 實際專案中,可以通過不同的queueName,分別例項化多個Queue,用在不同型別的訊息的區分
     * @return
     */
    @Bean
    public Queue queue(){
        return new ActiveMQQueue(queueName);
    }

    @Bean
    public Topic topic(){
        return new ActiveMQTopic(topicName);
    }

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
    }

    @Bean
    public JmsListenerContainerFactory jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(connectionFactory);

        return bean;
    }

    @Bean
    public JmsListenerContainerFactory jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        //設定為釋出訂閱方式, 預設情況下使用的生產消費者方式
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }
}

4:定義訊息佇列消費者:

抽象介面

image.png

image.png

QueueConsumer.java

package cc.ahxb.mq.consumer;

import cc.ahxb.mqinterface.IMqConsumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.stereotype.Component;

import javax.jms.*;


/**
 * Describe
 *
 * @author Jxx
 * @date 2018/12/1
 */
//加上Component註解,啟動即監聽了訊息佇列。
@Component
public class QueueConsumer implements IMqConsumer {

    @Autowired
    private ActiveMQConnectionFactory connectionFactory;

    /**
     * 接收到的Queue型別的訊息,
     * destination(訊息的目的地)為"publish.queue",在配置檔案中配置
     * containerFactory(容器工廠),jmsListenerContainerQueue,在ActiveMqConfig 中初始化
     * @param message  接收到的訊息
     * @throws Exception
     */
    @Override
    @JmsListener(destination = "publish.queue", containerFactory = "jmsListenerContainerQueue")
    public void onReceive(String message) throws Exception {
        System.out.println("接收到publish.queue中訊息:"+message+"");

    }

    /**
     * 設定消費者接收器(接收自定義佇列的訊息,會阻塞執行緒)
     * @param messageListener  訊息監聽器
     * @param destination    自定義佇列的名稱,必須與傳送端保持一致
     *
     */
    @Override
    public void setMessageListener(MessageListener messageListener,String destination) throws Exception {
        //1、建立工廠連線物件,需要制定ip和埠號

        //2、使用連線工廠建立一個連線物件
        Connection connection = connectionFactory.createConnection();
        //3、開啟連線
        connection.start();
        //4、使用連線物件建立會話(session)物件
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話物件建立目標物件,包含queue和topic(一對一和一對多)
        Queue queue = session.createQueue(destination);
        //6、使用會話物件建立生產者物件
        MessageConsumer consumer = session.createConsumer(queue);
        //7、向consumer物件中設定一個messageListener物件,用來接收訊息
        consumer.setMessageListener(messageListener);
        //8、程式等待接收使用者訊息
        System.in.read();
        //9、關閉資源
        consumer.close();
        session.close();
        connection.close();



    }

}

TopicConsumer.java

package cc.ahxb.mq.consumer;

import cc.ahxb.mqinterface.IMqConsumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.*;

/**
 * Describe
 *
 * @author Jxx
 * @date 2018/12/1
 */
//加上Component註解,啟動即監聽了訊息佇列。
@Component
public class TopicConsumer implements IMqConsumer {

    @Autowired
    private ActiveMQConnectionFactory connectionFactory;

    /**
     * 接收到的Topic型別的訊息,
     * destination(訊息的目的地)為"publish.topic",在配置檔案中配置
     * containerFactory(容器工廠),jmsListenerContainerTopic,在ActiveMqConfig 中初始化
     * @param message  接收到的訊息
     * @throws Exception
     */
    @Override
    @JmsListener(destination = "publish.topic", containerFactory = "jmsListenerContainerTopic")
    public void onReceive(String message) throws Exception {

        System.out.println("接收到publish.topic中訊息:"+message+"");

    }
    /**
     * 設定消費者接收器(接收自定義佇列的訊息,會阻塞執行緒)
     * @param messageListener  訊息監聽器
     * @param destination    自定義佇列的名稱,必須與傳送端保持一致
     *  @throws Exception
     */

    @Override
    public void setMessageListener(MessageListener messageListener, String destination) throws Exception {
        //2、使用連線工廠建立一個連線物件
        Connection connection = connectionFactory.createConnection();
        //3、開啟連線
        connection.start();
        //4、使用連線物件建立會話(session)物件
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話物件建立目標物件,包含queue和topic(一對一和一對多)
        Topic topic = session.createTopic(destination);
        //6、使用會話物件建立生產者物件
        MessageConsumer consumer = session.createConsumer(topic);
        //7、向consumer物件中設定一個messageListener物件,用來接收訊息
        consumer.setMessageListener(messageListener);
        //8、程式等待接收使用者訊息
        System.in.read();
        //9、關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
}

5:定義訊息佇列的生產者:

image.png

ActiveMqProducer.java

package cc.ahxb.mq.producer;

import cc.ahxb.mqinterface.IMqProducer;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.jms.Queue;
import javax.jms.Topic;

/**
 * Describe
 *
 * @author Jxx
 * @date 2018/12/1
 */

@Component
public class ActiveMqProducer implements IMqProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Autowired
    private Queue queue;
    @Autowired
    private Topic topic;

    /**
     * 傳送Queue型別的訊息
     * @param message
     * @throws Exception
     */
    @Override
    public void sendQueueMessage(String message) throws Exception {
        jmsMessagingTemplate.convertAndSend(this.queue,message);
    }

    /**
     * 傳送Topic型別的訊息
     * @param message
     * @throws Exception
     */
    @Override
    public void sendTopicMessage(String message) throws Exception {

        jmsMessagingTemplate.convertAndSend(this.topic,message);
    }


    /**
     *  向自定義佇列傳送訊息
     *  @param queueName  自定義的 destination佇列名
     * @param message    訊息內容
     * @throws Exception
     */
    @Override
    public void sendQueueMessage(String queueName,String message) throws Exception {

        jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName),message);

    }




}

6:呼叫示例:

package cc.ahxb.controller;

import cc.ahxb.model.Log;
import cc.ahxb.mq.consumer.QueueConsumer;
import cc.ahxb.mq.producer.ActiveMqProducer;
import cc.ahxb.util.JsonUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.jms.*;

/**
 * Describe
 *
 * @author Jxx
 * @date 2018/12/1
 */
@Controller
@RequestMapping("/mq")
public class MqController {

    @Autowired
    private ActiveMqProducer activeMqProducer;

    @GetMapping(value = "/sendMessage")
    @ResponseBody
    public JsonUtil sendMessage(){
        JsonUtil result = new JsonUtil();
        try {
            activeMqProducer.sendQueueMessage("測試訊息佇列");
            result.setMessage("傳送成功");
        }catch (Exception e){
            e.printStackTrace();
            result.setMessage("傳送失敗,請重新發送");
        }
        return result;
    }
}

補充:如果需要做到在Iframe中顯示管理介面,需要更改activeMQ 自帶伺服器的X-FRAME-OPTIONS策略