RabbitMQ系列之四 Spring RabbitMQ整合
阿新 • • 發佈:2018-12-01
本文將會詳細介紹Spring和RabbitMQ整合,其中主要介紹路由模式(Routing)其他模式大體差不多就不一一介紹
專案git地址:https://github.com/gitcaiqing/SpringRabbitMQ
1.專案結構
2.建立Maven專案pom.xml引入關鍵jar包,下文中有些jar包用不到可刪減
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring.version>4.1.4.RELEASE</spring.version> <jackson.version>2.5.0</jackson.version> </properties> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.0.RELEASE</version> </dependency> <!-- spring核心依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-war-plugin</artifactId> <configuration> <version>3.0</version> </configuration> </plugin> </plugins> </build> </project>
3.RabbitMQ連線配置config->RabbitMQ.properties
rmq.host=127.0.0.1
rmq.port=5672
rmq.user=guest
rmq.password=guest
rmq.channelCacheSize=25
rmq.virtual=/
4.Spring RabbitMQ整合配置xml檔案
此檔案中涉及到的一些物件,在下文中有定義
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd"> <description>Spring RabbitMQ 路由模式(Routing)配置</description> <!--引入配置屬性檔案 --> <context:property-placeholder location="classpath:config/*.properties" /> <!-- 配置RabbitMQ連線 --> <!-- channel-cache-size,channel的快取數量,預設值為25 --> <!-- cache-mode,快取連線模式,預設值為CHANNEL(單個connection連線,連線之後關閉,自動銷燬) --> <rabbit:connection-factory id="connectionFactory" host="${rmq.host}" port="${rmq.port}" username="${rmq.user}" password="${rmq.password}" virtual-host="${rmq.virtual}" channel-cache-size="${rmq.channelCacheSize}" cache-mode="CHANNEL"/> <rabbit:admin connection-factory="connectionFactory"/> <!-- 定義訊息佇列 durable:是否持久化,如果想在RabbitMQ退出或崩潰的時候,不失去queue和訊息,需要同時標誌佇列(queue) 和交換機(exchange)持久化,即rabbit:queue標籤和rabbit:direct-exchange中的durable=true,而訊息(message) 預設是持久化的,可以看類org.springframework.amqp.core.MessageProperties中的屬性 public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列 exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除; --> <rabbit:queue id="queue" name="SpringRabbit-Routing-Queue1" durable="true" auto-delete="false" exclusive="false"/> <!-- 繫結佇列 rabbitmq的exchangeType常用的三種模式:direct,fanout,topic三種,此處為direct模式,即rabbit:direct-exchange標籤, Direct交換器很簡單,如果是Direct型別,就會將訊息中的RoutingKey與該Exchange關聯的所有Binding中的BindingKey進行比較,如果相等, 則傳送到該Binding對應的Queue中。有一個需要注意的地方:如果找不到指定的exchange,就會報錯。 但routing key找不到的話,不會報錯,這條訊息會直接丟失,所以此處要小心, auto-delete:自動刪除,如果為Yes,則該交換機所有佇列queue刪除後,自動刪除交換機,預設為false --> <rabbit:direct-exchange name="SpringRabbit-Direct-Exchange" auto-declare="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queue" key="info"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring template宣告 --> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="SpringRabbit-Direct-Exchange"/> <!-- 生產者 --> <bean id="producter" class="com.rabbitmq.producter.Producter"> <constructor-arg name="rabbitTemplate" ref="rabbitTemplate"/> <constructor-arg name="routekey" value="info"/> </bean> <!-- 消費者 --> <bean id="consumer" class="com.rabbitmq.consumer.Consumer"/> <!-- 佇列監聽--> <!-- acknowledge:auto 自動確認(預設), manual手動確認 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="queue" ref="consumer"/> </rabbit:listener-container> </beans>
**5.**生產者
package com.rabbitmq.producter; import org.springframework.amqp.rabbit.core.RabbitTemplate; //生產者 public class Producter { private RabbitTemplate rabbitTemplate; private String routekey; public Producter(RabbitTemplate rabbitTemplate, String routekey) { this.rabbitTemplate = rabbitTemplate; this.routekey = routekey; } public void sendMessage(String message) { rabbitTemplate.convertAndSend(routekey, message); } }
6.消費者
package com.rabbitmq.consumer;
import java.io.UnsupportedEncodingException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* 消費者
* 實現 MessageListener或ChannelAwareMessageListener(需手動確認的實現此介面)
*/
public class Consumer implements MessageListener{
public void onMessage(Message message) {
String msg = null;
try {
msg = new String(message.getBody(),"UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("消費者消費:"+msg);
}
}
7.路由模式測試
package com.rabbitmq.controller;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.rabbitmq.producter.Producter;
//路由模式測試
public class RoutingTest {
public static void main(String[] args) {
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("spring-rabbitmq-routing.xml");
Producter routingProducter = ctx.getBean(Producter.class);
String message = "spring rabit routing hello!";
routingProducter.sendMessage(message);
ctx.close();
}
}
8.其他模式
其他模式就不一一實現,這裡簡單的把釋出訂閱模式和萬用字元模式和Spring整合配置檔案介紹下。釋出定義模式沒有去掉路由鍵,萬用字元模式則是講路由快取類似正則表示式去匹配即可
8.1釋出訂閱模式配置檔案
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd">
<description>Spring RabbitMQ 釋出訂閱模式(publish_subscrible)配置</description>
<!--引入配置屬性檔案 -->
<context:property-placeholder location="classpath:config/*.properties" />
<!-- 配置RabbitMQ連線 -->
<!-- channel-cache-size,channel的快取數量,預設值為25 -->
<!-- cache-mode,快取連線模式,預設值為CHANNEL(單個connection連線,連線之後關閉,自動銷燬) -->
<rabbit:connection-factory id="connectionFactory" host="${rmq.host}" port="${rmq.port}"
username="${rmq.user}" password="${rmq.password}" virtual-host="${rmq.virtual}"
channel-cache-size="${rmq.channelCacheSize}" cache-mode="CHANNEL"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!--
定義訊息佇列
durable:是否持久化,如果想在RabbitMQ退出或崩潰的時候,不失去queue和訊息,需要同時標誌佇列(queue)
和交換機(exchange)持久化,即rabbit:queue標籤和rabbit:direct-exchange中的durable=true,而訊息(message)
預設是持久化的,可以看類org.springframework.amqp.core.MessageProperties中的屬性
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列
exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除;
-->
<rabbit:queue id="queue" name="SpringRabbit-PublishScrible-Queue" durable="true" auto-delete="false" exclusive="false"/>
<!--
繫結佇列
rabbitmq的exchangeType常用的三種模式:direct,fanout,topic三種,此處為direct模式,即rabbit:direct-exchange標籤,
Direct交換器很簡單,如果是Direct型別,就會將訊息中的RoutingKey與該Exchange關聯的所有Binding中的BindingKey進行比較,如果相等,
則傳送到該Binding對應的Queue中。有一個需要注意的地方:如果找不到指定的exchange,就會報錯。
但routing key找不到的話,不會報錯,這條訊息會直接丟失,所以此處要小心,
auto-delete:自動刪除,如果為Yes,則該交換機所有佇列queue刪除後,自動刪除交換機,預設為false
-->
<rabbit:fanout-exchange name="SpringRabbit-Fanout-Exchange" auto-declare="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- spring template宣告 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="SpringRabbit-Fanout-Exchange"/>
<!-- 生產者 -->
<bean id="producter" class="com.rabbitmq.producter.Producter">
<constructor-arg name="rabbitTemplate" ref="rabbitTemplate"/>
<constructor-arg name="routekey" value=""/>
</bean>
<!-- 消費者 -->
<bean id="consumer" class="com.rabbitmq.consumer.Consumer"/>
<!-- 佇列監聽-->
<!-- acknowledge:auto 自動確認(預設), manual手動確認 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="queue" ref="consumer"/>
</rabbit:listener-container>
</beans>
8.2萬用字元模式配置檔案
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd">
<description>Spring RabbitMQ 萬用字元模式(Topics)配置</description>
<!--引入配置屬性檔案 -->
<context:property-placeholder location="classpath:config/*.properties" />
<!-- 配置RabbitMQ連線 -->
<!-- channel-cache-size,channel的快取數量,預設值為25 -->
<!-- cache-mode,快取連線模式,預設值為CHANNEL(單個connection連線,連線之後關閉,自動銷燬) -->
<rabbit:connection-factory id="connectionFactory" host="${rmq.host}" port="${rmq.port}"
username="${rmq.user}" password="${rmq.password}" virtual-host="${rmq.virtual}"
channel-cache-size="${rmq.channelCacheSize}" cache-mode="CHANNEL"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!--
定義訊息佇列
durable:是否持久化,如果想在RabbitMQ退出或崩潰的時候,不失去queue和訊息,需要同時標誌佇列(queue)
和交換機(exchange)持久化,即rabbit:queue標籤和rabbit:direct-exchange中的durable=true,而訊息(message)
預設是持久化的,可以看類org.springframework.amqp.core.MessageProperties中的屬性
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列
exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除;
-->
<rabbit:queue id="queue" name="SpringRabbit-Topics-Queue" durable="true" auto-delete="false" exclusive="false"/>
<!--
繫結佇列
rabbitmq的exchangeType常用的三種模式:direct,fanout,topic三種,此處為direct模式,即rabbit:direct-exchange標籤,
Direct交換器很簡單,如果是Direct型別,就會將訊息中的RoutingKey與該Exchange關聯的所有Binding中的BindingKey進行比較,如果相等,
則傳送到該Binding對應的Queue中。有一個需要注意的地方:如果找不到指定的exchange,就會報錯。
但routing key找不到的話,不會報錯,這條訊息會直接丟失,所以此處要小心,
auto-delete:自動刪除,如果為Yes,則該交換機所有佇列queue刪除後,自動刪除交換機,預設為false
-->
<rabbit:topic-exchange name="SpringRabbit-Topic-Exchange" auto-declare="true" auto-delete="false">
<rabbit:bindings>
<!--
符號“#”匹配一個或多個詞,符號“*”僅匹配一個詞
如:因此“china.#”能夠匹配到“china.news.info”,但是“china.*”只會匹配到“china.news”
-->
<rabbit:binding queue="queue" pattern="china.#"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- spring template宣告 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="SpringRabbit-Topic-Exchange"/>
<!-- 生產者 -->
<bean id="producter" class="com.rabbitmq.producter.Producter">
<constructor-arg name="rabbitTemplate" ref="rabbitTemplate"/>
<constructor-arg name="routekey" value="china.news"/>
</bean>
<!-- 消費者 -->
<bean id="consumer" class="com.rabbitmq.consumer.Consumer"/>
<!-- 佇列監聽-->
<!-- acknowledge:auto 自動確認(預設), manual手動確認 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="queue" ref="consumer"/>
</rabbit:listener-container>
</beans>