1. 程式人生 > >spring整合JMS一同步收發訊息(基於ActiveMQ的實現)

spring整合JMS一同步收發訊息(基於ActiveMQ的實現)

1. 安裝ActiveMQ

注意:JDK版本需要1.7及以上才行

bin目錄結構如下:


如果我們是32位的機器,就雙擊win32目錄下的activemq.bat,如果是64位機器,則雙擊win64目錄下的activemq.bat,執行結果如下:

啟動成功!成功之後在瀏覽器輸入http://127.0.0.1:8161/地址,可以看到ActiveMQ的管理頁面,使用者名稱和密碼預設都是admin,如下:

2. 新建一個Maven工程,並配置pom檔案如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.chhliu.myself</groupId>
	<artifactId>activemq_start</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>activemq_start</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<spring-version>3.2.5.RELEASE</spring-version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>${spring-version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>${spring-version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>${spring-version}</version>
		</dependency>
		<dependency>
			<groupId>javax.annotation</groupId>
			<artifactId>jsr250-api</artifactId>
			<version>1.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.13.3</version>
		</dependency>
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-pool2</artifactId>
			<version>2.0</version>
		</dependency>
	</dependencies>
</project>
3. 配置連線工廠(ConnectionFactory)

Spring給我們提供瞭如下的連線工廠:


其中SingleConnectionFactory保證每次返回的都是同一個連線,CachingConnectionFactory繼承了SingleConnectionFactory,在保證同一連線的同時,增加了快取的功能,可以快取Session以及生產者,消費者。當然,JMS提供的連線工廠只是用來實現管理的,並不是真正連線MQ的,真正的連線工廠需要具體的MQ廠商提供,下面我們以ActiveMQ為例來說明,配置如下:

<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616" />
	</bean>
<bean id="connectionFactory"
		class="org.springframework.jms.connection.SingleConnectionFactory">
		<property name="targetConnectionFactory" ref="targetConnectionFactory" />
	</bean>

為了減少我們連線的資源消耗,ActiveMQ為我們提供了一個連線工廠管理池--PooledConnectionFactory,通過連線工廠池,可以將Connection,Session等都放在池裡面,用的時候直接返回池裡面的內容,無需臨時建立連線,節約開銷。配置如下:

<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616" />
	</bean>

	<!-- 通過往PooledConnectionFactory注入一個ActiveMQConnectionFactory可以用來將Connection,Session和MessageProducer池化這樣可以大大減少我們的資源消耗, -->
	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
		<property name="connectionFactory" ref="targetConnectionFactory" />
		<property name="maxConnections" value="10" />
	</bean>

	<bean id="connectionFactory"
		class="org.springframework.jms.connection.SingleConnectionFactory">
		<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
	</bean>
4. 配置JmsTemplate

配置好連線工廠之後,就需要配置JMS的JmsTemplate,JmsTemplate的作用和JdbcTemplate類似,我們傳送和接收訊息,都是通過JmsTemplate來實現的,配置如下:

<!-- 配置生產者:配置好ConnectionFactory之後我們就需要配置生產者。生產者負責產生訊息併發送到JMS伺服器,這通常對應的是我們的一個業務邏輯服務實現類。 但是我們的服務實現類是怎麼進行訊息的傳送的呢?這通常是利用Spring為我們提供的JmsTemplate類來實現的, 所以配置生產者其實最核心的就是配置進行訊息傳送的JmsTemplate。對於訊息傳送者而言,它在傳送訊息的時候要知道自己該往哪裡發,	為此,我們在定義JmsTemplate的時候需要往裡面注入一個Spring提供的ConnectionFactory物件 -->
	<!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>
5. 生產者實現

配置完這些之後,我們就可以寫程式碼實現生產者和消費者了,生產者主要用來生產訊息,並向目的佇列中推送訊息,介面定義如下:

public interface ProducerService {
	void sendMessage(Destination destination, final String message);
}

實現類程式碼如下:

@Service("producerServiceImpl")
public class ProducerServiceImpl implements ProducerService {
	
	/**
	 * 注入JmsTemplate
	 */
	@Resource(name="jmsTemplate")
	private JmsTemplate jTemplate;

	/**
	 * attention:
	 * Details:傳送訊息
	 * @author chhliu
	 * 建立時間:2016-7-28 下午2:33:14
	 * @param destination
	 * @param message
	 */
	@Override
	public void sendMessage(Destination receivedestination, final String message) {
		
		System.out.println("================生產者建立了一條訊息==============");
		jTemplate.send(receivedestination, new MessageCreator() {
			
			@Override
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage("hello acticeMQ:"+message);
			}
		});
	}
}
6. 消費者實現

假設生產者已經建立了一條訊息,並推送到了對應的佇列中,消費者需要從這個佇列中取出訊息,並同時回覆一條報文,自己已經收到了這條訊息,為了測試回覆報文的功能,我們下面會將回復報文放到另一個佇列中,此例使用同步接收訊息的方式,而不是非同步監聽的方式實現,介面定義如下:

public interface ConsumerService {
	String receiveMessage(Destination destination, Destination replyDestination);
}

實現類程式碼如下:

@Service("consumerServiceImpl")
public class ConsumerServiceImpl implements ConsumerService {
	
	/**
	 * 注入JmsTemplate
	 */
	@Resource(name="jmsTemplate")
	private JmsTemplate jTemplate;
	
	/**
	 * attention:
	 * Details:接收訊息,同時回覆訊息
	 * @author chhliu
	 * 建立時間:2016-7-28 下午2:39:45
	 * @param destination
	 * @return
	 */
	@Override
	public String receiveMessage(Destination destination, Destination replyDestination) {
		/**
		 * 接收訊息佇列中的訊息
		 */
		Message message = jTemplate.receive(destination);
		try {
			/**
			 * 此處為了更好的容錯性,可以使用instanceof來判斷下訊息型別
			 */
			if(message instanceof TextMessage){
				String receiveMessage =  ((TextMessage) message).getText();
				System.out.println("收到生產者的訊息:"+receiveMessage);
				/**
				 * 收到訊息之後,將回復報文放到回覆佇列裡面去
				 */
				jTemplate.send(replyDestination, new MessageCreator() {
					
					@Override
					public Message createMessage(Session session) throws JMSException {
						return session.createTextMessage("消費者已經收到生產者的訊息了,這是一條確認報文!");
					}
				});
              return receiveMessage;
			}
		} catch (JMSException e) {
			e.printStackTrace();
		}
		return "";
	}
}
生產者和消費者實現之後,我們要做的就是配置隊列了,下面給出專案完整的配置檔案:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
	xmlns:jpa="http://www.springframework.org/schema/data/jpa"

	xsi:schemaLocation="http://www.springframework.org/schema/beans   
          http://www.springframework.org/schema/beans/spring-beans-3.2.xsd   
          http://www.springframework.org/schema/context   
          http://www.springframework.org/schema/context/spring-context-3.2.xsd   
          http://www.springframework.org/schema/aop   
          http://www.springframework.org/schema/aop/spring-aop-3.2.xsd   
          http://www.springframework.org/schema/tx    
          http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
          http://www.springframework.org/schema/cache 
          http://www.springframework.org/schema/cache/spring-cache-3.2.xsd
          http://www.springframework.org/schema/data/jpa
          http://www.springframework.org/schema/data/jpa/spring-jpa-1.3.xsd">

	<!-- 掃描註解包 -->
	<context:annotation-config />
	<context:component-scan base-package="com.chhliu.myself.activemq.start"></context:component-scan>
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616" />
	</bean>
	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
		<property name="connectionFactory" ref="targetConnectionFactory" />
		<property name="maxConnections" value="10" />
	</bean>
	<bean id="connectionFactory"	class="org.springframework.jms.connection.SingleConnectionFactory">
		<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
	</bean>
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>
	<!-- 在真正利用JmsTemplate進行訊息傳送的時候,我們需要知道訊息傳送的目的地,即destination。 在Jms中有一個用來表示目的地的Destination介面,它裡面沒有任何方法定義,只是用來做一個標識而已。當我們在使用JmsTemplate進行訊息傳送時沒有指定destination的時候將使用預設的Destination。 預設Destination可以通過在定義jmsTemplate bean物件時通過屬性defaultDestination或defaultDestinationName來進行注入, defaultDestinationName對應的就是一個普通字串 -->
	<!--這個是佇列目的地,點對點的 -->
	<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg>
			<value>NTF_MOCK_INPUT</value>
		</constructor-arg>
	</bean>
    <!--這個是回覆佇列,點對點的 -->
	<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg>
			<value>NTF_MOCK_OUTPUT</value>
		</constructor-arg>
	</bean>
</beans>

到這裡,所有的程式碼和配置檔案就都整好了,下面就是進行測試,測試程式碼如下:

生產者測試程式碼:
package com.chhliu.myself.activemq.start.sync;

import javax.annotation.Resource;
import javax.jms.Destination;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class SyncProducerActiveMQTest {
	
	@Resource(name="producerServiceImpl")
	private ProducerService pService;
	
	@Resource(name="queueDestination")
	private Destination receiveQueue;
	
	@Test
	public void producerTest(){
        pService.sendMessage(receiveQueue, "my name is chhliu!");
	}
}

消費者測試程式碼:
package com.chhliu.myself.activemq.start.sync;

import javax.annotation.Resource;
import javax.jms.Destination;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class SyncConsumerActiveMQTest {
	
	@Resource(name="consumerServiceImpl")
	private ConsumerService cService;
	
	@Resource(name="queueDestination")
	private Destination receiveQueue;
	
	@Resource(name="responseQueue")
	private Destination replyQueue;
	
	@Test
	public void producerTest(){
		String result = cService.receiveMessage(receiveQueue, replyQueue);
		System.out.println(result);
	}
}
測試結果如下:
<span style="background-color: rgb(51, 255, 51);">生產者測試結果:
================生產者建立了一條訊息==============
消費者測試結果:
收到生產者的訊息:hello acticeMQ:my name is chhliu!
hello acticeMQ:my name is chhliu!</span>

再來看下ActiveMQ的管理頁面的結果:

從管理頁面中可以看到,生產者生產了訊息,並且入隊列了,同時消費者也消費了訊息,並將回覆訊息放到了回覆佇列中,測試成功。

但是這種同步取訊息的方式有個缺點,每次只會取一條訊息消費,取完之後就會一直阻塞,下面來測試一下:首先讓生產者再生產5條訊息,然後執行消費者程式,發現會只消費一條訊息,除非我們在消費者程式裡面加while(true),一直輪詢佇列,這種實現方式不僅耗記憶體,效率也不是很高,後面,我們會對這種方式進行改進,使用非同步監聽模式,測試效果如下:

生產者建立了5條訊息:

=======生產者建立了一條訊息========

=======生產者建立了一條訊息========

=======生產者建立了一條訊息========

=======生產者建立了一條訊息========

======生產者建立了一條訊息=========

ActiveMQ管理頁面如下:

消費者消費一條訊息:

收到生產者的訊息:hello acticeMQ:my name is chhliu!

hello acticeMQ:my name is chhliu!

消費者消費訊息後,ActiveMQ管理頁面如下:

從上面的對比中,我們可以看出來,同步模式下,消費者消費訊息時,是逐條消費,每次只消費一條訊息。