ActiveMQ(三)——spring管理ActiveMQ,實現傳送和接收效果
一、前言 在前一篇部落格中,小編向大家簡單的介紹了一下ActiveMQ的訊息處理方式,包括了點對點,釋出訂閱兩種模式。寫向大家展示了一下如何使用,但是在真正開發的時候我們是不會寫那麼一大片程式碼,從建立連線工廠,再由連線工廠建立連線物件,連線物件開啟連線,連線物件然後建立session,session建立目的地,用於連線資料。
這個過程是比較複雜的,在開發的時候一般我們會使用Spring ,把這些操作來交給Spring管理,因為Spring的核心功能中包括了依賴注入,由spring容器建立連線工廠,方便操作。
二、ActiveMQ整合Spring 2.1 引入相關jar spring相關:
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency>
ActiveMQ相關:
<!--ActiveMQ-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
2.2 建立applicationContext-mq.xml檔案,用於配置整合資訊
-
這裡主要事實配置連線工程ConnectionFactory。 這裡是把ActiveMQConnectionFactory交給spring的jsm的SingleConnectionFactory管理:
-
配置了生產者 使用spring提供的jsm工具類,可以進行訊息傳送和接收
-
配置佇列或主題的目的地
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.137.15:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 配置生產者 --> <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 --> <property name="connectionFactory" ref="connectionFactory" /> </bean> <!--這個是佇列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>Ares-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="Ares_topic" /> </bean> <!-- 接收訊息 --> <!-- 配置監聽器 --> <bean id="queueConsumeListener" class="com.dmsd.mq.activemq.listener.QueueConsumeListener" /> <bean id="transactionBizMessageListener" class="com.dmsd.mq.activemq.listener.TransactionBizMessageListener" /> <!-- 訊息監聽容器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="transactionBizMessageListener" /> </bean> </beans>
三、傳送訊息到Queue 查詢職工id為3的查詢職員的相關資訊,並把 查詢的資訊放入到mq 中,這裡呢,小編為了突出存放的過程,就查詢了10次,並且都儲存到mq中:
這裡就用到了jms提供的配置類jmsTemplate的例項,通過jmsTemplate來進行儲存訊息操作:
package com.dmsd.mq.activemq;
import com.dmsd.dao.TStuffMapper;
import com.dmsd.pojo.TStuff;
import com.dmsd.tool.JacksonJsonUntil;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* Created by Ares on 2017/11/14.
*/
@Service
public class QueneProducer implements QueneProducerFacade {
//注入ActiveMQ的模板,在spring中注入的bean
@Autowired
JmsTemplate jmsTemplate;
//注入Queue的目的地址,在spring中配置的bean
@Autowired
ActiveMQQueue queueDestination;
@Autowired
TStuffMapper tStuffMapper;
/**
* 傳送訊息到佇列-王雷-2017年11月14日16:25:34
*/
@Override
public void QueneProducer() {
for (int i=0;i<10;i++){
System.out.println("開始傳送訊息=====》"+i);
//使用JmsTemplate物件傳送訊息。
jmsTemplate.send(queueDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//根據職工id查詢職員
TStuff tStuff = tStuffMapper.selectByPrimaryKey((long) 3);
//轉換為json
String json = null;
try {
json = JacksonJsonUntil.objectToJson(tStuff);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//建立一個訊息物件並返回
TextMessage textMessage = session.createTextMessage(json);
return textMessage;
}
});
System.out.println("第"+i+"條訊息傳送完成");
}
}
}
Controller程式碼:
package com.dmsd.mq.activemq.controller;
import com.dmsd.mq.activemq.QueneProducerFacade;
import com.dmsd.tool.AresResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* Created by Ares on 2017/11/16.
*/
@Controller
public class ActiveMqController {
//定義列印日誌相關
private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);
@Autowired
QueneProducerFacade queneProducerFacade;
@RequestMapping(value="/QueneProducer",method = RequestMethod.GET)
@ResponseBody
@CrossOrigin
public AresResult QueneProducer(){
try {
queneProducerFacade.QueneProducer();
return AresResult.build("0000","訊息上傳mq成功");
}catch (Exception e){
logger.error("訊息上傳mq失敗");
e.printStackTrace();
}
return AresResult.build("1111","訊息上傳mq失敗");
}
}
執行專案: MQ初始的情況: 在佇列中沒有訊息,pending message為0, 向MQ中儲存10條訊息: MQ中的情況:Messages Enqueued增加了10 ,Messages Dequeued 的值也增加了10。說明剛進去的訊息已經被消費者消費了。
Messages Enqueued 進入佇列的訊息 進入佇列的總數量,包括出佇列的。 這個數量只增不減
Messages Dequeued 出了佇列的訊息 可以理解為是消費這消費掉的數量 四、從ActiveMQ中取出訊息 這裡我們使用了監聽機制,ActiveMQ有三種訊息監聽器:MessageListener、SessionAwareMessageListener、MessageListenerAdapter。關於這三種的區別,小編在後面的部落格向大家介紹。
在上文中,小編向大家介紹了ActiveMQ整合spring。如果我們需要配置訊息接收,也需要在配置檔案中配置訊息監聽器:
這裡我們配置了監聽器,一直監聽要監聽的佇列,當佇列有訊息的時候,就會獲取的message,然後傳給後臺使用。
<!-- 接收訊息 -->
<!-- 配置監聽器 -->
<bean id="queueConsumeListener" class="com.dmsd.mq.activemq.listener.QueueConsumeListener" />
<bean id="transactionBizMessageListener" class="com.dmsd.mq.activemq.listener.TransactionBizMessageListener" />
<!-- 訊息監聽容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="transactionBizMessageListener" />
使用SessionAwareMessageListener監聽器:
可能很多朋友被監聽器嚇到了,其實就是一個類,這個類實現了SessionAwareMessageListener介面,實現了onMessage(message,session)方法,當監聽器所監聽的佇列有了資料後,就可以把資料獲取到,並且使用。
package com.dmsd.mq.activemq.listener;
import com.dmsd.api.StuffServiceFacade;
import com.dmsd.pojo.TStuff;
import com.dmsd.tool.AresResult;
import com.dmsd.tool.JacksonJsonUntil;
import org.apache.activemq.Message;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.listener.SessionAwareMessageListener;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.IOException;
/**
* Created by Ares on 2017/11/15.
*/
public class TransactionBizMessageListener implements SessionAwareMessageListener<Message> {
@Autowired
StuffServiceFacade stuffServiceFacade;
@Override
public void onMessage(Message message, Session session) throws JMSException {
//獲取訊息內容
ActiveMQTextMessage msg = (ActiveMQTextMessage) message;
System.out.println(msg);
String text = null;
text = msg.getText();
System.out.println(text);
//json轉換為物件
try {
TStuff tstuff = JacksonJsonUntil.jsonToPojo(text, TStuff.class);
TStuff stuff = new TStuff();
stuff.setSex(tstuff.getSex());
stuff.setName("德瑪西亞");
stuff.setClassid(tstuff.getClassid());
stuff.setAddress(tstuff.getAddress());
int i = stuffServiceFacade.insertStuff(stuff);
if (i>0){
System.out.println("新增職員成功");
}
else {
System.out.println("新增職員失敗");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
當佇列中有訊息的時候,會自動獲取訊息,然後執行onmessage方法,小編在onmessage方法中把佇列中的訊息取出來了,儲存到資料庫中了。
列印的儲存情況: 資料庫中儲存的資料: 五、小結 這個傳送和接收的過程和spring結合只有是比較簡單了,使用也比較方便了,重點還是要理解這個過程,spring和activeMQ整合的過程。下面的部落格中小編會向大家ActiveMQ的三種訊息監聽方式。