1. 程式人生 > >RabbitMQ系列之四 Spring RabbitMQ整合

RabbitMQ系列之四 Spring RabbitMQ整合

本文將會詳細介紹Spring和RabbitMQ整合,其中主要介紹路由模式(Routing)其他模式大體差不多就不一一介紹
專案git地址:https://github.com/gitcaiqing/SpringRabbitMQ
1.專案結構
在這裡插入圖片描述
2.建立Maven專案pom.xml引入關鍵jar包,下文中有些jar包用不到可刪減

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <spring.version>4.1.4.RELEASE</spring.version>
	<jackson.version>2.5.0</jackson.version>
  </properties>
  <dependencies>
  	<dependency>
		<groupId>com.rabbitmq</groupId>
		<artifactId>amqp-client</artifactId>
		<version>3.4.1</version>
	</dependency>
	<dependency>
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-log4j12</artifactId>
		<version>1.7.7</version>
	</dependency>
	<dependency>
		<groupId>org.apache.commons</groupId>
		<artifactId>commons-lang3</artifactId>
		<version>3.3.2</version>
	</dependency>
	<dependency>
       	<groupId>org.springframework.amqp</groupId>
       	<artifactId>spring-rabbit</artifactId>
       	<version>1.4.0.RELEASE</version>
    </dependency> 
   	<!-- spring核心依賴 -->
   	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-core</artifactId>
		<version>${spring.version}</version>
	</dependency>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-beans</artifactId>
		<version>${spring.version}</version>
	</dependency>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-context</artifactId>
		<version>${spring.version}</version>
	</dependency>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-tx</artifactId>
		<version>${spring.version}</version>
	</dependency>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-web</artifactId>
		<version>${spring.version}</version>
	</dependency>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-webmvc</artifactId>
		<version>${spring.version}</version>
	</dependency>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-jdbc</artifactId>
		<version>${spring.version}</version>
	</dependency>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-test</artifactId>
		<version>${spring.version}</version>
		<scope>test</scope>
	</dependency>
	<dependency>
    	<groupId>org.springframework</groupId>
    	<artifactId>spring-context-support</artifactId>
    	<version>${spring.version}</version>
	</dependency>
  </dependencies>
  <build>
  	<plugins>
  		<plugin>
  			<artifactId>maven-war-plugin</artifactId>
  			<configuration>
  				<version>3.0</version>
  			</configuration>
  		</plugin>
  	</plugins>
  </build>
</project>

3.RabbitMQ連線配置config->RabbitMQ.properties

rmq.host=127.0.0.1
rmq.port=5672
rmq.user=guest
rmq.password=guest
rmq.channelCacheSize=25
rmq.virtual=/

4.Spring RabbitMQ整合配置xml檔案
此檔案中涉及到的一些物件,在下文中有定義

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/rabbit 
	http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
	http://www.springframework.org/schema/beans 
	http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
	http://www.springframework.org/schema/context 
	http://www.springframework.org/schema/context/spring-context-4.1.xsd">
	<description>Spring RabbitMQ 路由模式(Routing)配置</description>
	
	<!--引入配置屬性檔案 -->
	<context:property-placeholder location="classpath:config/*.properties" />
	
 	<!-- 配置RabbitMQ連線 -->
 	<!-- channel-cache-size,channel的快取數量,預設值為25 -->
 	<!-- cache-mode,快取連線模式,預設值為CHANNEL(單個connection連線,連線之後關閉,自動銷燬) -->
 	<rabbit:connection-factory id="connectionFactory" host="${rmq.host}" port="${rmq.port}"
 		username="${rmq.user}" password="${rmq.password}" virtual-host="${rmq.virtual}" 
 		channel-cache-size="${rmq.channelCacheSize}" cache-mode="CHANNEL"/>
 	<rabbit:admin connection-factory="connectionFactory"/>
 
 	<!--
 		定義訊息佇列
 		durable:是否持久化,如果想在RabbitMQ退出或崩潰的時候,不失去queue和訊息,需要同時標誌佇列(queue)
 		和交換機(exchange)持久化,即rabbit:queue標籤和rabbit:direct-exchange中的durable=true,而訊息(message)
 		預設是持久化的,可以看類org.springframework.amqp.core.MessageProperties中的屬性
 		public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
 		auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列 
 		exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除;
 	-->
 	<rabbit:queue id="queue" name="SpringRabbit-Routing-Queue1" durable="true" auto-delete="false" exclusive="false"/>
 	
 	<!--
 		繫結佇列
 		rabbitmq的exchangeType常用的三種模式:direct,fanout,topic三種,此處為direct模式,即rabbit:direct-exchange標籤,
 		Direct交換器很簡單,如果是Direct型別,就會將訊息中的RoutingKey與該Exchange關聯的所有Binding中的BindingKey進行比較,如果相等,
 		則傳送到該Binding對應的Queue中。有一個需要注意的地方:如果找不到指定的exchange,就會報錯。
 		但routing key找不到的話,不會報錯,這條訊息會直接丟失,所以此處要小心,
 	    auto-delete:自動刪除,如果為Yes,則該交換機所有佇列queue刪除後,自動刪除交換機,預設為false 
 	-->
 	<rabbit:direct-exchange name="SpringRabbit-Direct-Exchange" auto-declare="true" auto-delete="false">
 		<rabbit:bindings>
 			<rabbit:binding queue="queue" key="info"></rabbit:binding>
 		</rabbit:bindings>
 	</rabbit:direct-exchange>
 	
 	<!-- spring template宣告 -->
 	<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="SpringRabbit-Direct-Exchange"/>

	<!-- 生產者 -->
	<bean id="producter" class="com.rabbitmq.producter.Producter">
		<constructor-arg name="rabbitTemplate" ref="rabbitTemplate"/>
		<constructor-arg name="routekey" value="info"/>
	</bean>
	
	<!-- 消費者  -->
 	<bean id="consumer" class="com.rabbitmq.consumer.Consumer"/>
	<!-- 佇列監聽-->
	<!-- acknowledge:auto 自動確認(預設), manual手動確認 -->
 	<rabbit:listener-container connection-factory="connectionFactory">
 		<rabbit:listener queues="queue" ref="consumer"/>
 	</rabbit:listener-container>
 	
</beans>

**5.**生產者

package com.rabbitmq.producter;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

//生產者
public class Producter {

	private RabbitTemplate rabbitTemplate;
	private String routekey;
	
	public Producter(RabbitTemplate rabbitTemplate, String routekey) {
		this.rabbitTemplate = rabbitTemplate;
		this.routekey = routekey;
	}
	
	public void sendMessage(String message) {
		rabbitTemplate.convertAndSend(routekey, message);
	}
}

6.消費者

package com.rabbitmq.consumer;
import java.io.UnsupportedEncodingException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
 * 消費者
 * 實現 MessageListener或ChannelAwareMessageListener(需手動確認的實現此介面)
 */
public class Consumer implements MessageListener{

	public void onMessage(Message message) {
		String msg = null;
		try {
			msg = new String(message.getBody(),"UTF-8");
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		System.out.println("消費者消費:"+msg);
	}
}

7.路由模式測試

package com.rabbitmq.controller;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.rabbitmq.producter.Producter;

//路由模式測試
public class RoutingTest {
	public static void main(String[] args) {
		AbstractApplicationContext  ctx = new ClassPathXmlApplicationContext("spring-rabbitmq-routing.xml");
		Producter routingProducter = ctx.getBean(Producter.class);
		String message = "spring rabit routing hello!";
		routingProducter.sendMessage(message);
		ctx.close();
	}
}

8.其他模式
其他模式就不一一實現,這裡簡單的把釋出訂閱模式和萬用字元模式和Spring整合配置檔案介紹下。釋出定義模式沒有去掉路由鍵,萬用字元模式則是講路由快取類似正則表示式去匹配即可
8.1釋出訂閱模式配置檔案

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
	http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd">
	<description>Spring RabbitMQ 釋出訂閱模式(publish_subscrible)配置</description>
	
	<!--引入配置屬性檔案 -->
	<context:property-placeholder location="classpath:config/*.properties" />
	
 	<!-- 配置RabbitMQ連線 -->
 	<!-- channel-cache-size,channel的快取數量,預設值為25 -->
 	<!-- cache-mode,快取連線模式,預設值為CHANNEL(單個connection連線,連線之後關閉,自動銷燬) -->
 	<rabbit:connection-factory id="connectionFactory" host="${rmq.host}" port="${rmq.port}"
 		username="${rmq.user}" password="${rmq.password}" virtual-host="${rmq.virtual}" 
 		channel-cache-size="${rmq.channelCacheSize}" cache-mode="CHANNEL"/>
 	<rabbit:admin connection-factory="connectionFactory"/>
 
 	<!--
 		定義訊息佇列
 		durable:是否持久化,如果想在RabbitMQ退出或崩潰的時候,不失去queue和訊息,需要同時標誌佇列(queue)
 		和交換機(exchange)持久化,即rabbit:queue標籤和rabbit:direct-exchange中的durable=true,而訊息(message)
 		預設是持久化的,可以看類org.springframework.amqp.core.MessageProperties中的屬性
 		public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
 		auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列 
 		exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除;
 	-->
 	<rabbit:queue id="queue" name="SpringRabbit-PublishScrible-Queue" durable="true" auto-delete="false" exclusive="false"/>
 	
 	<!--
 		繫結佇列
 		rabbitmq的exchangeType常用的三種模式:direct,fanout,topic三種,此處為direct模式,即rabbit:direct-exchange標籤,
 		Direct交換器很簡單,如果是Direct型別,就會將訊息中的RoutingKey與該Exchange關聯的所有Binding中的BindingKey進行比較,如果相等,
 		則傳送到該Binding對應的Queue中。有一個需要注意的地方:如果找不到指定的exchange,就會報錯。
 		但routing key找不到的話,不會報錯,這條訊息會直接丟失,所以此處要小心,
 	    auto-delete:自動刪除,如果為Yes,則該交換機所有佇列queue刪除後,自動刪除交換機,預設為false 
 	-->
 	<rabbit:fanout-exchange name="SpringRabbit-Fanout-Exchange" auto-declare="true" auto-delete="false">
 		<rabbit:bindings>
 			<rabbit:binding queue="queue"></rabbit:binding>
 		</rabbit:bindings>
 	</rabbit:fanout-exchange>
 	
 	<!-- spring template宣告 -->
 	<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="SpringRabbit-Fanout-Exchange"/>

	<!-- 生產者 -->
	<bean id="producter" class="com.rabbitmq.producter.Producter">
		<constructor-arg name="rabbitTemplate" ref="rabbitTemplate"/>
		<constructor-arg name="routekey" value=""/>
	</bean>
	
	<!-- 消費者  -->
 	<bean id="consumer" class="com.rabbitmq.consumer.Consumer"/>
	<!-- 佇列監聽-->
	<!-- acknowledge:auto 自動確認(預設), manual手動確認 -->
 	<rabbit:listener-container connection-factory="connectionFactory">
 		<rabbit:listener queues="queue" ref="consumer"/>
 	</rabbit:listener-container>
 	
</beans>

8.2萬用字元模式配置檔案

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
	http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd">
	<description>Spring RabbitMQ 萬用字元模式(Topics)配置</description>
	
	<!--引入配置屬性檔案 -->
	<context:property-placeholder location="classpath:config/*.properties" />
	
 	<!-- 配置RabbitMQ連線 -->
 	<!-- channel-cache-size,channel的快取數量,預設值為25 -->
 	<!-- cache-mode,快取連線模式,預設值為CHANNEL(單個connection連線,連線之後關閉,自動銷燬) -->
 	<rabbit:connection-factory id="connectionFactory" host="${rmq.host}" port="${rmq.port}"
 		username="${rmq.user}" password="${rmq.password}" virtual-host="${rmq.virtual}" 
 		channel-cache-size="${rmq.channelCacheSize}" cache-mode="CHANNEL"/>
 	<rabbit:admin connection-factory="connectionFactory"/>
 
 	<!--
 		定義訊息佇列
 		durable:是否持久化,如果想在RabbitMQ退出或崩潰的時候,不失去queue和訊息,需要同時標誌佇列(queue)
 		和交換機(exchange)持久化,即rabbit:queue標籤和rabbit:direct-exchange中的durable=true,而訊息(message)
 		預設是持久化的,可以看類org.springframework.amqp.core.MessageProperties中的屬性
 		public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
 		auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列 
 		exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除;
 	-->
 	<rabbit:queue id="queue" name="SpringRabbit-Topics-Queue" durable="true" auto-delete="false" exclusive="false"/>
 	
 	<!--
 		繫結佇列
 		rabbitmq的exchangeType常用的三種模式:direct,fanout,topic三種,此處為direct模式,即rabbit:direct-exchange標籤,
 		Direct交換器很簡單,如果是Direct型別,就會將訊息中的RoutingKey與該Exchange關聯的所有Binding中的BindingKey進行比較,如果相等,
 		則傳送到該Binding對應的Queue中。有一個需要注意的地方:如果找不到指定的exchange,就會報錯。
 		但routing key找不到的話,不會報錯,這條訊息會直接丟失,所以此處要小心,
 	    auto-delete:自動刪除,如果為Yes,則該交換機所有佇列queue刪除後,自動刪除交換機,預設為false 
 	-->
 	<rabbit:topic-exchange name="SpringRabbit-Topic-Exchange" auto-declare="true" auto-delete="false">
 		<rabbit:bindings>
 			<!-- 
 				符號“#”匹配一個或多個詞,符號“*”僅匹配一個詞
 				如:因此“china.#”能夠匹配到“china.news.info”,但是“china.*”只會匹配到“china.news”
 			 -->
 			<rabbit:binding queue="queue" pattern="china.#"></rabbit:binding>
 		</rabbit:bindings>
 	</rabbit:topic-exchange>
 	
 	<!-- spring template宣告 -->
 	<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="SpringRabbit-Topic-Exchange"/>

	<!-- 生產者 -->
	<bean id="producter" class="com.rabbitmq.producter.Producter">
		<constructor-arg name="rabbitTemplate" ref="rabbitTemplate"/>
		<constructor-arg name="routekey" value="china.news"/>
	</bean>
	
	<!-- 消費者  -->
 	<bean id="consumer" class="com.rabbitmq.consumer.Consumer"/>
	<!-- 佇列監聽-->
	<!-- acknowledge:auto 自動確認(預設), manual手動確認 -->
 	<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
 		<rabbit:listener queues="queue" ref="consumer"/>
 	</rabbit:listener-container>
 	
</beans>