1. 程式人生 > >rabbitmq與springmvc結合

rabbitmq與springmvc結合

RabbitMq

         訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。消 息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的技術。排隊指的是應用程式通過 佇列來通訊。佇列的使用除去了接收和傳送應用程式同時執行的要求。其中較為成熟的MQ產品有IBM WEBSPHERE MQ。

RabbitMQ的結構圖如下: RabbitMQ一般的使用場景: 1.作為一種事件繫結監聽處理,當client觸發了某一個exchange,然後rabbitMq會將取得的資料塞入到queue中。然後由於在配置檔案中已經配置好了對於queue的監聽介面,當有資料塞入到queue中會觸發介面執行程式。 2.一般這種服務也可以作為一種延時處理的操作結合redis,將監聽queue的介面繫結到redis中,可以將觸發得到的資料儲存到redis中然後在通過執行緒有後臺監聽資料物件。 Rabbit 資料生產者XML配置檔案
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/rabbit
  8. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  9. <bean id="rabbitUtil" class="com.rabbit.util.RabbitUtil"/>
  10. <!-- 這個是一個生產者的資訊的配置 -->
  11. <!-- 連線服務配置 -->
  12. <rabbit:connection-factory id="connectionFactory" addresses
    ="127.0.0.1:5672" publisher-confirms="true"/>
  13. <!-- spring amqp預設的是jackson 的一個外掛,目的將生產者生產的資料轉換為json存入訊息佇列 -->
  14. <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean>
  15. <!-- spring template宣告(作為一個代理-》也就是代理模式中的代理)-->
  16. <rabbit:template exchange="my-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>
  17. <!-- 設定rabbit的管理 -->
  18. <rabbit:admin connection-factory="connectionFactory"/>
  19. <!-- queue 佇列宣告-->
  20. <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
  21. <rabbit:queue id="queue_tow" durable="true" auto-delete="false" exclusive="false" name="queue_tow"/>
  22. <rabbit:queue id="queue_three" durable="true" auto-delete="false" exclusive="false" name="queue_three"/>
  23. <!-- 將佇列繫結到交換路由同時與key繫結 -->
  24. <rabbit:fanout-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
  25. <rabbit:bindings>
  26. <rabbit:binding queue="queue_one"/>
  27. <rabbit:binding queue="queue_tow"/>
  28. </rabbit:bindings>
  29. </rabbit:fanout-exchange>
  30. <!-- 將與通道繫結的事件與佇列繫結 -->
  31. <rabbit:fanout-exchange name="mq-exchange2" durable="true" auto-delete="false" id="mq-exchange2">
  32. <rabbit:bindings>
  33. <rabbit:binding queue="queue_one"/>
  34. <rabbit:binding queue="queue_tow"/>
  35. <rabbit:binding queue="queue_three"/>
  36. </rabbit:bindings>
  37. </rabbit:fanout-exchange>
  38. </beans>
Rabbit 事件監聽XML配置
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
  5. http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
  6. <!-- 配置spring的佇列監聽介面(建立監聽處理物件) -->
  7. <bean id="consumeMessage" class="com.rabbit.test.QueueOneListener" />
  8. <!-- 連線服務配置 -->
  9. <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"/>
  10. <bean id="jacksonConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
  11. <!-- 設定rabbit的管理 -->
  12. <rabbit:admin connection-factory="connectionFactory"/>
  13. <!-- 消費者監聽佇列動態資訊 -->
  14. <rabbit:listener-container connection-factory="connectionFactory" message-converter="jacksonConverter" acknowledge="none">
  15. <rabbit:listener ref="consumeMessage" method="listenOne" queue-names="queue_one" />
  16. <rabbit:listener ref="consumeMessage" method="listenTwo" queue-names="queue_tow" />
  17. <rabbit:listener ref="consumeMessage" method="listenThree" queue-names="queue_three" />
  18. <rabbit:listener ref="consumeMessage" method="listenThree1" queue-names="queue_three" />
  19. <rabbit:listener ref="consumeMessage" method="testRedisAnnotation" queue-names="queue_three" />
  20. </rabbit:listener-container>
  21. </beans>
        通過配置rabbit:listener來設定對於某個queue的監聽方法的處理設定 封裝一個Rabbit例項
  1. package com.rabbit.util;
  2. import org.springframework.amqp.AmqpException;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.core.MessageDeliveryMode;
  5. import org.springframework.amqp.core.MessagePostProcessor;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.amqp.rabbit.support.CorrelationData;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. /**
  11. * 用於提供Rabbit的操作類
  12. * Created by Michael Zhao on 14-4-1.
  13. */
  14. @Component
  15. public class RabbitUtil {
  16. private final RabbitTemplate rabbitTemplate;
  17. @Autowired
  18. public RabbitUtil(RabbitTemplate rabbitTemplate){
  19. this.rabbitTemplate = rabbitTemplate;
  20. }
  21. public <T> void sendReliable(String exchange, T message) {
  22. //實現將message通過json轉換&將物件傳送
  23. //convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData)
  24. rabbitTemplate.convertAndSend(exchange, "", message, new MessagePostProcessor() {
  25. //實現message操作處理實現
  26. @Override
  27. public Message postProcessMessage(Message message) throws AmqpException {
  28. //設定資訊的屬性資訊&設定傳送模式(PERSISTENT:連續的)
  29. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  30. return message;
  31. }
  32. }, new CorrelationData(String.valueOf(message)));
  33. }
  34. }
rabbit監聽介面層
  1. package com.rabbit.test;
  2. import com.rabbit.ActionInterface.RedisNoResultAction;
  3. import com.rabbit.annotation.RedisLock;
  4. import com.rabbit.util.RedisKeys;
  5. import com.rabbit.util.RedisLockExecute;
  6. import com.rabbit.util.RedisLockModel;
  7. import com.rabbit.util.RedisTemplate;
  8. import org.junit.Before;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.stereotype.Component;
  13. import redis.clients.jedis.Jedis;
  14. import java.util.concurrent.TimeUnit;
  15. /**
  16. * 用於提供Rabbit監聽介面層
  17. * Created by Michael Zhao on 14-4-1.
  18. */
  19. @Component
  20. public class QueueOneListener{
  21. @Autowired
  22. private RedisTemplate redisTemplate;
  23. private static final Logger log = LoggerFactory.getLogger(QueueOneListener.class);
  24. @Before
  25. public void setUp(){
  26. //銷燬所有已經建立的鎖
  27. RedisLockExecute.destructionLocks(redisTemplate);
  28. }
  29. public void listenOne(Object foo) {
  30. System.out.println("listen1");
  31. System.out.println(foo);
  32. }
  33. public void listenTwo(Object foo) {
  34. System.out.println("listen2");
  35. System.out.println(foo);
  36. }
  37. public void listenThree(Object foo) {
  38. System.out.println("listen3");
  39. System.out.println(foo);
  40. }
  41. public void listenThree1(Object foo){
  42. System.out.println("listenThree->"+foo);
  43. }
  44. /**
  45. * 實現將資訊塞入到redis中
  46. * @param foo 物件資訊
  47. */
  48. public void sendRedis(final Object foo){
  49. //這個是針對分散式環境下的鎖機制
  50. RedisLockModel redisLockModel = RedisLockExecute.acquireLock(redisTemplate, RedisKeys.REDIS_TEST, 1000, (int) TimeUnit.SECONDS.toSeconds(10));
  51. try{
  52. if(RedisLockExecute.ACQUIRE_RESULT(redisLockModel)){
  53. redisTemplate.execute(new RedisNoResultAction() {
  54. @Override
  55. public void actionNoResult(Jedis jedis) {
  56. jedis.lpush("sendRedis:" , foo.toString());
  57. }
  58. });
  59. }else{
  60. log.debug("acquire lock is failed!");
  61. }
  62. } catch (Exception e){
  63. log.error("send Redis failed , error={}", e);
  64. }finally {
  65. //釋放鎖
  66. RedisLockExecute.releaseLock(redisTemplate, redisLockModel);
  67. }
  68. }
  69. @RedisLock(redisKeys = RedisKeys.REDIS_TEST , maxWait = 10, expiredTime = 1000)
  70. public void testRedisAnnotation(final Object foo){
  71. try{
  72. redisTemplate.execute(new RedisNoResultAction() {
  73. @Override
  74. public void actionNoResult(Jedis jedis) {
  75. jedis.lpush("sendRedis:" , foo.toString());
  76. }
  77. });
  78. } catch (Exception e){
  79. log.error("send Redis failed , error={}", e);
  80. }
  81. }
  82. }
rabbit測試類
  1. package com.rabbit.test;
  2. import com.rabbit.util.RabbitUtil;
  3. import org.junit.Test;
  4. import org.junit.runner.RunWith;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.test.context.ContextConfiguration;
  7. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  8. @RunWith(SpringJUnit4ClassRunner.class)
  9. @ContextConfiguration(locations = {
  10. "classpath:/spring/root-context.xml",
  11. "classpath:/spring/rabbit-product.xml",
  12. "classpath:/spring/rabbit-context.xml",
  13. "classpath:/spring/redis-context.xml"
  14. })
  15. public class RabbitTest {
  16. @Autowired
  17. private RabbitUtil rabbitUtil;
  18. private Integer number = 1;
  19. @Test
  20. public void testRabbit() {
  21. while(true){
  22. rabbitUtil.sendReliable("my-mq-exchange", number++);
  23. rabbitUtil.sendReliable("mq-exchange2", number++);
  24. try {
  25. Thread.sleep(1000);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }
  31. }
提示:如果想要執行著一些程式碼,請設定對應的Spring環境。對於redis的配置這邊可以不設定直接去除。redis的介紹將在下一遍中涉及。