1. 程式人生 > >淺談jms之(通過spring整合activeMQ實現jms)例項

淺談jms之(通過spring整合activeMQ實現jms)例項

上篇說到使用activemq 實現jms訊息服務 的例項,但是那個是沒有spring進行整合和管理的;其實spring完整提供對jms的支援,所以我們可以通過spring來管理整合activemq 實現jms訊息傳遞服務。

1.建立maven工程(我的專案都是通過maven管理的),在pom中引入我們要用的jar包,如activemq-core、spring-web、還有spring對jms支援的jar包等,如果不是通過maven 管理的  可以去spring官網下載相應的jar包。

<pre name="code" class="html"><dependency>
    		<groupId>org.apache.activemq</groupId>
    		<artifactId>activemq-core</artifactId>
    		<version>5.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>3.0.5.RELEASE</version>		
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-web</artifactId>
			<version>3.0.5.RELEASE</version>
		</dependency>

2.  ①按上篇中的開發流程,無論是對於一個queue佇列模式還是 topic 模式的 , 首先需要連線activemq服務的connectionFactory。然後我們需要建立一個queue佇列或一個topic 主題(destination 訊息傳送的目的站)。還有就是spring -jms 的api為我們提供了JmsTemplate,查看了 JmsTemplate類發現 spring已經將我們需要的 獲取activemq 的連線 傳送 都做了層封裝;所以我們使用的時候只用呼叫模板類中的方法 就可以了 。則我們要在spring-bean.xml(applicationContext..xml)配置檔案中配置上這些:

queue 模式:

<!-- 配置jms 連線mq 的工廠類 -->
		<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
			<property name="brokerURL" value="tcp://localhost:61616"/>
		</bean>
		<!-- (點到點的佇列 )queueDest:接收訊息的目的站 -->
		<bean id="queueDest" class="org.apache.activemq.command.ActiveMQQueue" >
			<constructor-arg index="0" value="spring-activemq-queue"/>
		</bean>
		<!-- 使用模板JMSTemplate : 在Spring框架中使用JMS傳遞訊息有兩種方式:JMS template
		和message listener container,
		前者用於同步收發訊息,後者用於非同步收發訊息。 -->
		<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
			<property name="connectionFactory" ref="connectionFactory"/>
			<property name="defaultDestination" ref="queueDest"/>
		</bean>
topic 模式:
<!-- pub/sub的destination:接收訂閱訊息的目的站 -->
		<bean id="topicDest" class="org.apache.activemq.command.ActiveMQTopic">
			<!-- 設定主題名稱 -->
			<constructor-arg index="0" value="spring-activemq-topic"/>
		</bean>
		<bean id="jmsTopicTemplate"  class="org.springframework.jms.core.JmsTemplate">  
        	<property name="connectionFactory" ref="connectionFactory" />  
        	<property name="defaultDestination" ref="topicDest" />  
        	<!-- 配置是否為釋出訂閱者模式,預設為false -->  
        	<property name="pubSubDomain" value="true"/>  
    	</bean> 

②.接下來我們要建立queue佇列的生產者 和消費者 :

QueueProducer.java

/**
 * 點對點  生產者
 * @author leo
 *
 */
public class QueueProducer {
	/**
	 * jms 模板
	 */
	private JmsTemplate jmsTemplate;
	/**
	 * queuedest 訊息傳送目的站
	 */
	private Destination destination;
	
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	public void setDestination(Destination destination) {
		this.destination = destination;
	}
	/**
	 * 生產者傳送訊息
	 */
	public void sendMsg(){
		//訊息構造器
		MessageCreator creator = new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				TextMessage message =session.createTextMessage();
				message.setText("傳送點到點型別的資訊");
				return message;
			}
		};
		jmsTemplate.send(this.destination, creator);
	}
	
	
}
QueueCustomer.java 
/**
 * point -to -point  消費者
 * 
 * @author leo
 *
 */
public class QueueCustomer {
	//jms 模板
	private JmsTemplate jmsTemplate;
	//目的地
	private Destination destination;
	
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	public void setDestination(Destination destination) {
		this.destination = destination;
	}
	/**
	 * 接收訊息的方法
	 */
	public void receiveMsg(){
		//接收來自於目的站的資訊
		TextMessage message = (TextMessage) jmsTemplate.receive(destination);
		try {
			System.out.println(message.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
	
}
以及applicationContext.xml 中配置管理生產者 和消費者:
<!-- 配置點到點的 消費者(Customer) 和生產者(Producer) -->
    	<bean id="queueProducer" class="com.deppon.activemq.queue.producer.QueueProducer">
    		<property name="jmsTemplate" ref="jmsQueueTemplate"/>
    		<property name="destination" ref="queueDest"/>
    		<!--<property name="destination" ref="queueCustomerListenerDest"/> -->
    	</bean>
    	<bean id="queueCustomer" class="com.deppon.activemq.queue.customer.QueueCustomer">
    		<property name="jmsTemplate" ref="jmsQueueTemplate"/>
    		<property name="destination" ref="queueDest"/>
    	</bean>
③.對於topic主題模式的 我們要建立他的釋出者和訂閱者:

TopicPublisher.java

/**
 * 釋出/訂閱 模式的 釋出端
 * @author leo
 *
 */
public class TopicPublisher {
	 //jms模板
	 private JmsTemplate jmsTemplate;
	 //目的站
	 private Destination destination;
	 
	 public void setDestination(Destination destination) {
		this.destination = destination;
	}
	 public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	/**
	 * 傳送訊息的方法 
	 */
	public void sendTopicMsg(){
		//訊息構造器
		MessageCreator creator = new MessageCreator() {
			
			@Override
			public Message createMessage(Session session) throws JMSException {
				//建立訊息
				TextMessage message = session.createTextMessage();
				message.setText("傳送topic型別的訊息-leo");
				return message;
			}
		};
		//傳送訊息
		jmsTemplate.send(destination, creator);
	} 
}
TopicSubscriber.java
/**
 * topic的  訂閱端
 * @author leo
 *
 */
public class TopicSubscriber {
		//jms模板
		 private JmsTemplate jmsTemplate;
		 //目的站
		 private Destination destination;
		 
		 public void setDestination(Destination destination) {
			this.destination = destination;
		}
		 public void setJmsTemplate(JmsTemplate jmsTemplate) {
			this.jmsTemplate = jmsTemplate;
		}
		 /**
		  * 接收訊息
		  */
		 public  void receiveTopicMsg(){
			 TextMessage message =(TextMessage) jmsTemplate.receive(destination);	 
			 try {
				System.out.println("接收topic訊息:"+message.getText());
			} catch (JMSException e) {
				e.printStackTrace();
			}
			 
		 }
}
以及在applicationContext.xml 配置檔案中關於訂閱\釋出的管理
<!-- 配置pub/sub的 釋出者(subscriber) 和 訂閱者(publisher) -->
    	<bean id="topicPublisher" class="com.deppon.activemq.topic.publisher.TopicPublisher">
    		<property name="jmsTemplate" ref="jmsTopicTemplate"/>
    		<property name="destination" ref="topicDest"/>
    		<!-- <property name="destination" ref="topicListenerDest"/> -->
    	</bean>
    	<bean id="topicSubscriber" class="com.deppon.activemq.topic.subscriber.TopicSubscriber">
    		<property name="jmsTemplate" ref="jmsTopicTemplate"/>
    		<property name="destination" ref="topicDest"/>
    	</bean>
 那兩個模式的關於spring 整合activemq 實現jms訊息服務的簡單demo 就完成了。接下來寫兩個測試測試下兩種方式,看是訊息傳遞是否成功。值得注意的是:activemq 服務一定要記得開啟。

生產者測試類:

/**
 * 佇列--生產者測試類
 * @author leo
 *
 */
public class TestProducer {
	
	  	private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "/com/deppon/activemq/META-INF/ds-spring.xml");  
	   
	    /** 
	     * @param args 
	     */  
	    public static void main(String[] args) {  
	    	//獲取生產者
	    	QueueProducer producer =(QueueProducer) appContext.getBean("queueProducer");
	    	//傳送訊息
	    	producer.sendMsg();
	    }  
}

消費者測試類:

/**
 * 消費者測試類
 * @author leo
 *
 */
public class TestCustomer {
	
	private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "/com/deppon/activemq/META-INF/ds-spring.xml");  
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		//消費者
		QueueCustomer customer = (QueueCustomer) appContext.getBean("queueCustomer");
		//開啟接收主執行緒
		customer.receiveMsg();
	}

}
執行生產者測試類,發現active上已經有新的佇列,然後再執行消費者測試類,發現訊息被取到。


同樣寫個topic 的pub\sub測試類,這邊要注意,我們之前說過topic模式和queue是有區別的,所以topic 要先執行訂閱者測試類,然後再執行釋出者測試類 才可以收到訊息。

(3)之前我們講的這些直接使用spring給的JMS template都是同步的收發訊息 ,可是spring給我們提供另外一種的jms訊息傳遞方式:message listener container,這個就是用於非同步收發訊息方式的。就是寫一個實現MessageListener介面的類。這樣消費者可以時時接受到生產者的訊息,訂閱者可以時時接受到釋出者的訊息.不用迴圈接受(非同步接收訊息)。
 ①首先我們要先實現訊息監聽介面的消費者實現類:

/**
 * 消費者Customer實現text訊息監聽,實現(非同步)時時接收
 * @author leo
 *
 */
public class QueueCustomerListener implements MessageListener{

	@Override
	public void onMessage(Message message) {
		TextMessage textMessage =null;
			try {
				//然後型別是屬於textMessage
				if (message instanceof TextMessage) {
					textMessage =(TextMessage)message;
					System.out.println("佇列 MessageListener收到資訊:"+textMessage.getText());
				}
			} catch (JMSException e) {
				e.printStackTrace();
			}
	}

}
以及applicationContext.xml 配置檔案中需要配置上相應的監聽管理 以及要改掉生產者中所對應的佇列目的站:
	
    	<bean id="queueCustomerListener" class="com.deppon.activemq.queue.linsteners.QueueCustomerListener"></bean>
    	<!-- 新建一個佇列, -->
		<bean id="queueCustomerListenerDest" class="org.apache.activemq.command.ActiveMQQueue" >
			<constructor-arg index="0" value="new-queue"/>
		</bean>
		<!-- 訊息監聽容器 -->
		<bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
			<property name="destination" ref="queueCustomerListenerDest"/>
			<property name="connectionFactory" ref="connectionFactory"/>
			<property name="messageListener" ref="queueCustomerListener"/>
		</bean>
這會不需要之前的那個消費者類:只要修改掉生產者中的目的站就可以
<bean id="queueProducer" class="com.deppon.activemq.queue.producer.QueueProducer">
    		<property name="jmsTemplate" ref="jmsQueueTemplate"/>
    		<!-- <property name="destination" ref="queueDest"/> -->
    		<property name="destination" ref="queueCustomerListenerDest"/>
    	</bean>
②.對於topic主題模式的,也是一樣:
/**
 * 訂閱者Subscriber實現text訊息監聽,實現(非同步)時時接收
 * @author leo
 *
 */
public class TopicSubscriberListener implements MessageListener{

	@Override
	public void onMessage(Message message) {
		TextMessage textMessage =(TextMessage)message;
		try {
			System.out.println("接收實現MessageListener的topic監聽:"+textMessage.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}
以及applicationContext.xml 配置檔案中需要配置上相應的監聽管理 以及要改掉生髮布者中所對應的主題目的站:
<!-- 訂閱監聽 -->
		<bean id="topicSubscriberListener" class="com.deppon.activemq.topic.listeners.TopicSubscriberListener"></bean>
		<!-- 新建一個主題(平臺) -->
		<bean id="topicListenerDest" class="org.apache.activemq.command.ActiveMQTopic">
			<!-- 設定佇列名稱 -->
			<constructor-arg index="0" value="new-topic"/>
		</bean>
		<bean id="myTopicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
			<property name="destination" ref="topicListenerDest"/>
			<property name="connectionFactory" ref="connectionFactory"/>
			<property name="messageListener" ref="topicSubscriberListener"/>
			<!-- 配置是否為釋出訂閱者模式,預設為false -->  
        	<property name="pubSubDomain" value="true"/>  
		</bean>
釋出者中原來的目的站,要改成新的主題目的站:
<bean id="topicPublisher" class="com.deppon.activemq.topic.publisher.TopicPublisher">
    		<property name="jmsTemplate" ref="jmsTopicTemplate"/>
    		<!-- <property name="destination" ref="topicDest"/> -->
    		<property name="destination" ref="topicListenerDest"/>
    	</bean>
③然後啟動一個之前寫的那個生產者測試類,發現執行生產者測試類後 不用再執行消費者測試類就可以收到訊息。同樣topic 也是如此。