javaEE ActiveMQ,ActiveMQ與Spring整合
阿新 • • 發佈:2018-12-17
需要先安裝ActiveMQ服務,並啟動。
生產者:
applicationContext.xml(Spring配置檔案,配置工廠,配置生產者):
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 配置ConnectionFactory工廠 --> <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供(ActiveMQ) --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.161:61616" /> <!-- ActiveMQ服務的地址 --> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory(如果以後不使用ActiveMQ,需要更換成其他的,只需要更換targeConnectionFactory即可) --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 配置生產者 --> <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 --> <property name="connectionFactory" ref="connectionFactory" /> </bean> <!-- 配置目的地 --> <!--這個是佇列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的(廣播,釋出/訂閱) --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="itemAddTopic" /> </bean> </beans>
Test.java(測試類,傳送訊息):
package cn.e3mall.activemq; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.junit.Test; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class Test { @Test public void sendMessage() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //從容器中獲得JmsTemplate物件(實際專案中可以通過屬性注入)。 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); //從容器中獲得一個Destination物件。 Destination destination = (Destination) applicationContext.getBean("queueDestination"); //傳送訊息 jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("send activemq message"); } }); } }
消費者:
applicationContext.xml(Spring配置檔案,配置工廠,配置監聽器):
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 配置ConnectionFactory工廠 --> <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供(ActiveMQ) --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.161:61616" /> <!-- ActiveMQ服務的地址 --> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory(如果以後不使用ActiveMQ,需要更換成其他的,只需要更換targeConnectionFactory即可) --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 配置目的地 --> <!--這個是佇列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的(廣播,釋出/訂閱) --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="itemAddTopic" /> </bean> <!-- 消費者,接收訊息 --> <!-- 自定義的訊息監聽器,需要實現MessageListener介面 --> <bean id="myMessageListener" class="cn.e3mall.search.message.MyMessageListener"/> <!-- 訊息監聽容器。Spring容器初始化後,就會自動監聽訊息 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean> </beans>
Test.java(測試類,監聽訊息):
package cn.e3mall.activemq;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Test {
@Test
public void msgConsumer() throws Exception {
//初始化spring容器。初始化容器之後就會自動監聽訊息
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//等待
System.in.read();
}
}
MyMessageListener.java(自定義的訊息監聽器,實現MessageListener介面。監聽到訊息後的具體業務邏輯):
package cn.e3mall.search.message;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//取訊息內容
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}