使用Spring配置ActiveMQ的釋出訂閱模式
阿新 • • 發佈:2019-01-10
通過Spring對ActiveMQ進行配置開發,釋出訂閱模式,支援訊息的持久化。
需要Spring2.5版本以上,如果有多個訂閱者,每個訂閱者需要指定不同的 clientId 。
釋出者的配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> <!-- 配置JMS連線工廠 --> <bean id="myConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session快取數量 --> <property name="sessionCacheSize" value="10" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="tcp://localhost:61616" /> <!-- 是否非同步傳送 --> <property name="useAsyncSend" value="true" /> </bean> </property> </bean> <!-- 傳送訊息的目的地(一個主題) --> <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 設定訊息主題的名字 --> <constructor-arg index="0" value="Online.Notice.Topic" /> </bean> <!-- 配置JMS模版 --> <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="myConnectionFactory" /> <property name="defaultDestination" ref="myDestination" /> <!-- 訂閱釋出模式 --> <property name="pubSubDomain" value="true" /> <property name="receiveTimeout" value="10000" /> </bean> </beans>
釋出者的程式碼
package com.xikang.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class SimpleJMSSender { public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext-send.xml"); JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("myJmsTemplate"); for (int i = 0; i < 10; i++) { jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage msg = session.createTextMessage(); // 設定訊息屬性 msg.setStringProperty("phrCode", "C001"); // 設定訊息內容 msg.setText("Hello World!"); return msg; } }); } } }
訂閱者的配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> <!-- 配置JMS連線工廠 --> <bean id="myConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session快取數量 --> <property name="sessionCacheSize" value="10" /> <!-- 接收者ID --> <property name="clientId" value="client_119" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> </property> </bean> <!-- 傳送訊息的目的地(一個主題) --> <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 設定訊息主題的名字 --> <constructor-arg index="0" value="Online.Notice.Topic" /> </bean> <!-- 生產訊息配置 (自己定義)--> <bean id="myTopicConsumer" class="com.xikang.jms.SimpleJMSReceiver" /> <!-- 訊息監聽器 --> <bean id="myTopicListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="myTopicConsumer" /> <!-- 接收訊息的方法名稱 --> <property name="defaultListenerMethod" value="receive" /> <!-- 不進行訊息轉換 --> <property name="messageConverter"><null/></property> </bean> <!-- 訊息監聽容器 --> <bean id="myListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="myConnectionFactory" /> <!-- 釋出訂閱模式 --> <property name="pubSubDomain" value="true"/> <!-- 訊息持久化 --> <property name="subscriptionDurable" value="true"/> <property name="receiveTimeout" value="10000"/> <!-- 接收者ID --> <property name="clientId" value="client_119" /> <property name="durableSubscriptionName" value="client_119"/> <property name="destination" ref="myDestination" /> <property name="messageListener" ref="myTopicListener" /> </bean> </beans>
訂閱者的程式碼:
package com.xikang.jms;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.JmsException;
public class SimpleJMSReceiver {
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext-receive.xml");
while(true) {
}
}
public void receive(TextMessage message) throws JmsException, JMSException {
System.out.println(message.getStringProperty("phrCode"));
System.out.println(message.getText());
}
}