rabbitmq與springmvc結合
阿新 • • 發佈:2018-12-18
RabbitMq
訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。消 息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的技術。排隊指的是應用程式通過 佇列來通訊。佇列的使用除去了接收和傳送應用程式同時執行的要求。其中較為成熟的MQ產品有IBM WEBSPHERE MQ。
RabbitMQ的結構圖如下: RabbitMQ一般的使用場景: 1.作為一種事件繫結監聽處理,當client觸發了某一個exchange,然後rabbitMq會將取得的資料塞入到queue中。然後由於在配置檔案中已經配置好了對於queue的監聽介面,當有資料塞入到queue中會觸發介面執行程式。 2.一般這種服務也可以作為一種延時處理的操作結合redis,將監聽queue的介面繫結到redis中,可以將觸發得到的資料儲存到redis中然後在通過執行緒有後臺監聽資料物件。 Rabbit 資料生產者XML配置檔案- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/rabbit
- http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
- <bean id="rabbitUtil" class="com.rabbit.util.RabbitUtil"/>
- <!-- 這個是一個生產者的資訊的配置 -->
- <!-- 連線服務配置 -->
- <rabbit:connection-factory id="connectionFactory" addresses
="127.0.0.1:5672" publisher-confirms="true"/> - <!-- spring amqp預設的是jackson 的一個外掛,目的將生產者生產的資料轉換為json存入訊息佇列 -->
- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean>
- <!-- spring template宣告(作為一個代理-》也就是代理模式中的代理)-->
- <rabbit:template exchange="my-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>
- <!-- 設定rabbit的管理 -->
- <rabbit:admin connection-factory="connectionFactory"/>
- <!-- queue 佇列宣告-->
- <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
- <rabbit:queue id="queue_tow" durable="true" auto-delete="false" exclusive="false" name="queue_tow"/>
- <rabbit:queue id="queue_three" durable="true" auto-delete="false" exclusive="false" name="queue_three"/>
- <!-- 將佇列繫結到交換路由同時與key繫結 -->
- <rabbit:fanout-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
- <rabbit:bindings>
- <rabbit:binding queue="queue_one"/>
- <rabbit:binding queue="queue_tow"/>
- </rabbit:bindings>
- </rabbit:fanout-exchange>
- <!-- 將與通道繫結的事件與佇列繫結 -->
- <rabbit:fanout-exchange name="mq-exchange2" durable="true" auto-delete="false" id="mq-exchange2">
- <rabbit:bindings>
- <rabbit:binding queue="queue_one"/>
- <rabbit:binding queue="queue_tow"/>
- <rabbit:binding queue="queue_three"/>
- </rabbit:bindings>
- </rabbit:fanout-exchange>
- </beans>
Rabbit 事件監聽XML配置
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
- http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
- <!-- 配置spring的佇列監聽介面(建立監聽處理物件) -->
- <bean id="consumeMessage" class="com.rabbit.test.QueueOneListener" />
- <!-- 連線服務配置 -->
- <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"/>
- <bean id="jacksonConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
- <!-- 設定rabbit的管理 -->
- <rabbit:admin connection-factory="connectionFactory"/>
- <!-- 消費者監聽佇列動態資訊 -->
- <rabbit:listener-container connection-factory="connectionFactory" message-converter="jacksonConverter" acknowledge="none">
- <rabbit:listener ref="consumeMessage" method="listenOne" queue-names="queue_one" />
- <rabbit:listener ref="consumeMessage" method="listenTwo" queue-names="queue_tow" />
- <rabbit:listener ref="consumeMessage" method="listenThree" queue-names="queue_three" />
- <rabbit:listener ref="consumeMessage" method="listenThree1" queue-names="queue_three" />
- <rabbit:listener ref="consumeMessage" method="testRedisAnnotation" queue-names="queue_three" />
- </rabbit:listener-container>
- </beans>
通過配置rabbit:listener來設定對於某個queue的監聽方法的處理設定
封裝一個Rabbit例項
- package com.rabbit.util;
- import org.springframework.amqp.AmqpException;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageDeliveryMode;
- import org.springframework.amqp.core.MessagePostProcessor;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.support.CorrelationData;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- /**
- * 用於提供Rabbit的操作類
- * Created by Michael Zhao on 14-4-1.
- */
- @Component
- public class RabbitUtil {
- private final RabbitTemplate rabbitTemplate;
- @Autowired
- public RabbitUtil(RabbitTemplate rabbitTemplate){
- this.rabbitTemplate = rabbitTemplate;
- }
- public <T> void sendReliable(String exchange, T message) {
- //實現將message通過json轉換&將物件傳送
- //convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData)
- rabbitTemplate.convertAndSend(exchange, "", message, new MessagePostProcessor() {
- //實現message操作處理實現
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- //設定資訊的屬性資訊&設定傳送模式(PERSISTENT:連續的)
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- return message;
- }
- }, new CorrelationData(String.valueOf(message)));
- }
- }
rabbit監聽介面層- package com.rabbit.test;
- import com.rabbit.ActionInterface.RedisNoResultAction;
- import com.rabbit.annotation.RedisLock;
- import com.rabbit.util.RedisKeys;
- import com.rabbit.util.RedisLockExecute;
- import com.rabbit.util.RedisLockModel;
- import com.rabbit.util.RedisTemplate;
- import org.junit.Before;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import redis.clients.jedis.Jedis;
- import java.util.concurrent.TimeUnit;
- /**
- * 用於提供Rabbit監聽介面層
- * Created by Michael Zhao on 14-4-1.
- */
- @Component
- public class QueueOneListener{
- @Autowired
- private RedisTemplate redisTemplate;
- private static final Logger log = LoggerFactory.getLogger(QueueOneListener.class);
- @Before
- public void setUp(){
- //銷燬所有已經建立的鎖
- RedisLockExecute.destructionLocks(redisTemplate);
- }
- public void listenOne(Object foo) {
- System.out.println("listen1");
- System.out.println(foo);
- }
- public void listenTwo(Object foo) {
- System.out.println("listen2");
- System.out.println(foo);
- }
- public void listenThree(Object foo) {
- System.out.println("listen3");
- System.out.println(foo);
- }
- public void listenThree1(Object foo){
- System.out.println("listenThree->"+foo);
- }
- /**
- * 實現將資訊塞入到redis中
- * @param foo 物件資訊
- */
- public void sendRedis(final Object foo){
- //這個是針對分散式環境下的鎖機制
- RedisLockModel redisLockModel = RedisLockExecute.acquireLock(redisTemplate, RedisKeys.REDIS_TEST, 1000, (int) TimeUnit.SECONDS.toSeconds(10));
- try{
- if(RedisLockExecute.ACQUIRE_RESULT(redisLockModel)){
- redisTemplate.execute(new RedisNoResultAction() {
- @Override
- public void actionNoResult(Jedis jedis) {
- jedis.lpush("sendRedis:" , foo.toString());
- }
- });
- }else{
- log.debug("acquire lock is failed!");
- }
- } catch (Exception e){
- log.error("send Redis failed , error={}", e);
- }finally {
- //釋放鎖
- RedisLockExecute.releaseLock(redisTemplate, redisLockModel);
- }
- }
- @RedisLock(redisKeys = RedisKeys.REDIS_TEST , maxWait = 10, expiredTime = 1000)
- public void testRedisAnnotation(final Object foo){
- try{
- redisTemplate.execute(new RedisNoResultAction() {
- @Override
- public void actionNoResult(Jedis jedis) {
- jedis.lpush("sendRedis:" , foo.toString());
- }
- });
- } catch (Exception e){
- log.error("send Redis failed , error={}", e);
- }
- }
- }
rabbit測試類
- package com.rabbit.test;
- import com.rabbit.util.RabbitUtil;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.test.context.ContextConfiguration;
- import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
- @RunWith(SpringJUnit4ClassRunner.class)
- @ContextConfiguration(locations = {
- "classpath:/spring/root-context.xml",
- "classpath:/spring/rabbit-product.xml",
- "classpath:/spring/rabbit-context.xml",
- "classpath:/spring/redis-context.xml"
- })
- public class RabbitTest {
- @Autowired
- private RabbitUtil rabbitUtil;
- private Integer number = 1;
- @Test
- public void testRabbit() {
- while(true){
- rabbitUtil.sendReliable("my-mq-exchange", number++);
- rabbitUtil.sendReliable("mq-exchange2", number++);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
提示:如果想要執行著一些程式碼,請設定對應的Spring環境。對於redis的配置這邊可以不設定直接去除。redis的介紹將在下一遍中涉及。