1. 程式人生 > >使用Spring配置ActiveMQ的釋出訂閱模式

使用Spring配置ActiveMQ的釋出訂閱模式

通過Spring對ActiveMQ進行配置開發,釋出訂閱模式,支援訊息的持久化。

需要Spring2.5版本以上,如果有多個訂閱者,每個訂閱者需要指定不同的 clientId 。

釋出者的配置

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans  
         http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
         http://www.springframework.org/schema/context  
         http://www.springframework.org/schema/context/spring-context-2.5.xsd">

	<!-- 配置JMS連線工廠 -->
	<bean id="myConnectionFactory"
		class="org.springframework.jms.connection.CachingConnectionFactory">
		<!-- Session快取數量 -->
		<property name="sessionCacheSize" value="10" />
		<property name="targetConnectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<!-- MQ地址 -->
				<property name="brokerURL" value="tcp://localhost:61616" />
				 <!-- 是否非同步傳送 -->
				<property name="useAsyncSend" value="true" />
			</bean>
		</property>
	</bean>

	<!-- 傳送訊息的目的地(一個主題) -->
	<bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">
		<!-- 設定訊息主題的名字 -->
		<constructor-arg index="0" value="Online.Notice.Topic" />
	</bean>

	<!-- 配置JMS模版 -->
	<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="myConnectionFactory" />
		<property name="defaultDestination" ref="myDestination" />
		<!-- 訂閱釋出模式 -->
		<property name="pubSubDomain" value="true" />
		<property name="receiveTimeout" value="10000" />
	</bean>
</beans>

釋出者的程式碼
package com.xikang.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class SimpleJMSSender {

	public static void main(String[] args) {
		ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext-send.xml");
		
		JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("myJmsTemplate");
		for (int i = 0; i < 10; i++) {
			jmsTemplate.send(new MessageCreator() {
				public Message createMessage(Session session) throws JMSException {
					TextMessage msg = session.createTextMessage();
					// 設定訊息屬性
					msg.setStringProperty("phrCode", "C001");
					// 設定訊息內容
					msg.setText("Hello World!");
					return msg;
				}
			});
		}
	}
}

訂閱者的配置:
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans  
         http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
         http://www.springframework.org/schema/context  
         http://www.springframework.org/schema/context/spring-context-2.5.xsd">

	<!-- 配置JMS連線工廠 -->
	<bean id="myConnectionFactory"
		class="org.springframework.jms.connection.CachingConnectionFactory">
		<!-- Session快取數量 -->
		<property name="sessionCacheSize" value="10" />
		<!-- 接收者ID -->
		<property name="clientId" value="client_119" />
		<property name="targetConnectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<!-- MQ地址 -->
				<property name="brokerURL" value="tcp://localhost:61616" />
			</bean>
		</property>
	</bean>

	<!-- 傳送訊息的目的地(一個主題) -->
	<bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">
		<!-- 設定訊息主題的名字 -->
		<constructor-arg index="0" value="Online.Notice.Topic" />
	</bean>

	<!-- 生產訊息配置 (自己定義)-->
	<bean id="myTopicConsumer" class="com.xikang.jms.SimpleJMSReceiver" />

	<!-- 訊息監聽器 -->
	<bean id="myTopicListener"
		class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="myTopicConsumer" />
		<!-- 接收訊息的方法名稱 -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- 不進行訊息轉換 -->
		<property name="messageConverter"><null/></property>
	</bean>

	<!-- 訊息監聽容器 -->
	<bean id="myListenerContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="myConnectionFactory" />
		<!-- 釋出訂閱模式 -->
		<property name="pubSubDomain" value="true"/>
		<!-- 訊息持久化 -->
		<property name="subscriptionDurable" value="true"/>
		<property name="receiveTimeout" value="10000"/>
		<!-- 接收者ID -->
		<property name="clientId" value="client_119" />
		<property name="durableSubscriptionName" value="client_119"/>
		<property name="destination" ref="myDestination" />
		<property name="messageListener" ref="myTopicListener" />
	</bean>

</beans>

訂閱者的程式碼:
package com.xikang.jms;

import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.JmsException;

public class SimpleJMSReceiver {

	public static void main(String[] args) {
		ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext-receive.xml");
		while(true) {
		}
	}
	
	public void receive(TextMessage message) throws JmsException, JMSException {
		System.out.println(message.getStringProperty("phrCode"));
		System.out.println(message.getText());
	}
}