1. 程式人生 > >ActiveMQ(三)——spring管理ActiveMQ,實現傳送和接收效果

ActiveMQ(三)——spring管理ActiveMQ,實現傳送和接收效果

一、前言 在前一篇部落格中,小編向大家簡單的介紹了一下ActiveMQ的訊息處理方式,包括了點對點,釋出訂閱兩種模式。寫向大家展示了一下如何使用,但是在真正開發的時候我們是不會寫那麼一大片程式碼,從建立連線工廠,再由連線工廠建立連線物件,連線物件開啟連線,連線物件然後建立session,session建立目的地,用於連線資料。

這個過程是比較複雜的,在開發的時候一般我們會使用Spring ,把這些操作來交給Spring管理,因為Spring的核心功能中包括了依賴注入,由spring容器建立連線工廠,方便操作。

二、ActiveMQ整合Spring 2.1 引入相關jar spring相關:

         <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
        </dependency>

ActiveMQ相關:

      <!--ActiveMQ-->
      <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>${activemq.version}</version>
      </dependency>

2.2 建立applicationContext-mq.xml檔案,用於配置整合資訊

  • 這裡主要事實配置連線工程ConnectionFactory。 這裡是把ActiveMQConnectionFactory交給spring的jsm的SingleConnectionFactory管理:

  • 配置了生產者 使用spring提供的jsm工具類,可以進行訊息傳送和接收

  • 配置佇列或主題的目的地

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.137.15:61616" />
    </bean>
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

    <!-- 配置生產者 -->
    <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>

    <!--這個是佇列目的地,點對點的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>Ares-queue</value>
        </constructor-arg>
    </bean>
    <!--這個是主題目的地,一對多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="Ares_topic" />
    </bean>

    <!-- 接收訊息 -->
    <!-- 配置監聽器 -->
    <bean id="queueConsumeListener" class="com.dmsd.mq.activemq.listener.QueueConsumeListener" />
    <bean id="transactionBizMessageListener" class="com.dmsd.mq.activemq.listener.TransactionBizMessageListener" />

    <!-- 訊息監聽容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="transactionBizMessageListener" />
    </bean>
</beans>

三、傳送訊息到Queue 查詢職工id為3的查詢職員的相關資訊,並把 查詢的資訊放入到mq 中,這裡呢,小編為了突出存放的過程,就查詢了10次,並且都儲存到mq中:

這裡就用到了jms提供的配置類jmsTemplate的例項,通過jmsTemplate來進行儲存訊息操作:

package com.dmsd.mq.activemq;

import com.dmsd.dao.TStuffMapper;
import com.dmsd.pojo.TStuff;
import com.dmsd.tool.JacksonJsonUntil;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * Created by Ares on 2017/11/14.
 */
@Service
public class QueneProducer implements QueneProducerFacade {

    //注入ActiveMQ的模板,在spring中注入的bean
    @Autowired
    JmsTemplate jmsTemplate;

    //注入Queue的目的地址,在spring中配置的bean
    @Autowired
    ActiveMQQueue queueDestination; 

    @Autowired
    TStuffMapper tStuffMapper;

    /**
     * 傳送訊息到佇列-王雷-2017年11月14日16:25:34
     */
    @Override
    public void QueneProducer()  {
        for (int i=0;i<10;i++){
            System.out.println("開始傳送訊息=====》"+i);

            //使用JmsTemplate物件傳送訊息。
            jmsTemplate.send(queueDestination, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    //根據職工id查詢職員
                    TStuff tStuff = tStuffMapper.selectByPrimaryKey((long) 3);
                    //轉換為json
                    String json = null;
                    try {
                        json = JacksonJsonUntil.objectToJson(tStuff);
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                    //建立一個訊息物件並返回
                    TextMessage textMessage = session.createTextMessage(json);
                    return textMessage;
                }
            });
            System.out.println("第"+i+"條訊息傳送完成");
        }
    }
}

Controller程式碼:

package com.dmsd.mq.activemq.controller;

import com.dmsd.mq.activemq.QueneProducerFacade;
import com.dmsd.tool.AresResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * Created by Ares on 2017/11/16.
 */
@Controller
public class ActiveMqController {

    //定義列印日誌相關
    private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);

    @Autowired
    QueneProducerFacade queneProducerFacade;


    @RequestMapping(value="/QueneProducer",method = RequestMethod.GET)
    @ResponseBody
    @CrossOrigin
    public AresResult QueneProducer(){
        try {
            queneProducerFacade.QueneProducer();
            return AresResult.build("0000","訊息上傳mq成功");
        }catch (Exception e){
            logger.error("訊息上傳mq失敗");
            e.printStackTrace();
        }
        return AresResult.build("1111","訊息上傳mq失敗");
    }
}

執行專案: MQ初始的情況: 在佇列中沒有訊息,pending message為0, 在這裡插入圖片描述 向MQ中儲存10條訊息: 在這裡插入圖片描述 在這裡插入圖片描述 MQ中的情況:Messages Enqueued增加了10 ,Messages Dequeued 的值也增加了10。說明剛進去的訊息已經被消費者消費了。

Messages Enqueued 進入佇列的訊息 進入佇列的總數量,包括出佇列的。 這個數量只增不減

Messages Dequeued 出了佇列的訊息 可以理解為是消費這消費掉的數量 在這裡插入圖片描述 四、從ActiveMQ中取出訊息 這裡我們使用了監聽機制,ActiveMQ有三種訊息監聽器:MessageListener、SessionAwareMessageListener、MessageListenerAdapter。關於這三種的區別,小編在後面的部落格向大家介紹。

在上文中,小編向大家介紹了ActiveMQ整合spring。如果我們需要配置訊息接收,也需要在配置檔案中配置訊息監聽器:

這裡我們配置了監聽器,一直監聽要監聽的佇列,當佇列有訊息的時候,就會獲取的message,然後傳給後臺使用。

<!-- 接收訊息 -->
    <!-- 配置監聽器 -->
    <bean id="queueConsumeListener" class="com.dmsd.mq.activemq.listener.QueueConsumeListener" />
    <bean id="transactionBizMessageListener" class="com.dmsd.mq.activemq.listener.TransactionBizMessageListener" />

    <!-- 訊息監聽容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="transactionBizMessageListener" />

使用SessionAwareMessageListener監聽器:

可能很多朋友被監聽器嚇到了,其實就是一個類,這個類實現了SessionAwareMessageListener介面,實現了onMessage(message,session)方法,當監聽器所監聽的佇列有了資料後,就可以把資料獲取到,並且使用。

package com.dmsd.mq.activemq.listener;

import com.dmsd.api.StuffServiceFacade;
import com.dmsd.pojo.TStuff;
import com.dmsd.tool.AresResult;
import com.dmsd.tool.JacksonJsonUntil;
import org.apache.activemq.Message;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.listener.SessionAwareMessageListener;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.IOException;

/**
 * Created by Ares on 2017/11/15.
 */
public class TransactionBizMessageListener implements SessionAwareMessageListener<Message> {

    @Autowired
    StuffServiceFacade stuffServiceFacade;


    @Override
    public void onMessage(Message message, Session session) throws JMSException {
        //獲取訊息內容
        ActiveMQTextMessage msg = (ActiveMQTextMessage) message;
        System.out.println(msg);
        String text = null;
        text = msg.getText();
        System.out.println(text);
        //json轉換為物件
        try {
            TStuff tstuff = JacksonJsonUntil.jsonToPojo(text, TStuff.class);
            TStuff stuff = new TStuff();
            stuff.setSex(tstuff.getSex());
            stuff.setName("德瑪西亞");
            stuff.setClassid(tstuff.getClassid());
            stuff.setAddress(tstuff.getAddress());
            int i = stuffServiceFacade.insertStuff(stuff);
            if (i>0){
                System.out.println("新增職員成功");
            }
            else {
                System.out.println("新增職員失敗");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

當佇列中有訊息的時候,會自動獲取訊息,然後執行onmessage方法,小編在onmessage方法中把佇列中的訊息取出來了,儲存到資料庫中了。

列印的儲存情況: 在這裡插入圖片描述 資料庫中儲存的資料: 在這裡插入圖片描述 五、小結 這個傳送和接收的過程和spring結合只有是比較簡單了,使用也比較方便了,重點還是要理解這個過程,spring和activeMQ整合的過程。下面的部落格中小編會向大家ActiveMQ的三種訊息監聽方式。