1. 程式人生 > >RabbitMQ學習(十一)之spring整合傳送非同步訊息

RabbitMQ學習(十一)之spring整合傳送非同步訊息

實現使用Exchange型別為DirectExchange. routingkey的名稱預設為Queue的名稱。非同步傳送訊息。

1.配置檔案

[plain] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. #============== rabbitmq config ====================  
  2. rabbit.hosts=192.168.36.102  
  3. rabbit.username=admin  
  4. rabbit.password=admin  
  5. rabbit.virtualHost=/  
  6. rabbit.queue=spring-queue-async  
  7. rabbit.routingKey=spring-queue-async#<span style="font-family: Helvetica, Tahoma, Arial, sans-serif; font-size: 14px; line-height: 25.2000007629395px;">routingkey的名稱預設為Queue的名稱</span>  
2.生產者配置applicationContext-rabbitmq-async-send.xml: [html] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. <?xmlversion="1.0"encoding="UTF-8"?>
  2. <beansxmlns="http://www.springframework.org/schema/beans"
  3.        xmlns:context="http://www.springframework.org/schema/context"
  4.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5.        xsi:schemaLocation="  
  6.         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
  7.         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">
  8.     <beanclass="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
    >
  9.         <propertyname="systemPropertiesModeName"value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/>
  10.         <propertyname="ignoreResourceNotFound"value="true"/>
  11.         <propertyname="locations">
  12.             <list>
  13.                 <!-- 標準配置 -->
  14.                 <value>classpath*:/application.properties</value>
  15.             </list>
  16.         </property>
  17.     </bean>
  18.     <!-- 建立connectionFactory -->
  19.     <beanid="rabbitConnectionFactory"
  20.       class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
  21.         <constructor-argvalue="${rabbit.hosts}"/>
  22.         <propertyname="username"value="${rabbit.username}"/>
  23.         <propertyname="password"value="${rabbit.password}"/>
  24.         <propertyname="virtualHost"value="${rabbit.virtualHost}"/>
  25.         <propertyname="channelCacheSize"value="5"/>
  26.     </bean>
  27.     <!-- 建立rabbitAdmin 代理類 -->
  28.     <beanid="rabbitAdmin"
  29.         class="org.springframework.amqp.rabbit.core.RabbitAdmin">
  30.         <constructor-argref="rabbitConnectionFactory"/>
  31.     </bean>
  32.     <!-- 建立rabbitTemplate 訊息模板類 
  33.      -->
  34.     <beanid="rabbitTemplate"
  35.         class="org.springframework.amqp.rabbit.core.RabbitTemplate">
  36.         <constructor-argref="rabbitConnectionFactory"></constructor-arg>
  37.         <propertyname="routingKey"value="${rabbit.routingKey}"></property>
  38.     </bean>
  39. </beans>
3.生產者傳送訊息程式碼Send.java [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. package cn.slimsmart.rabbitmq.demo.spring.async;  
  2. import org.springframework.amqp.core.AmqpTemplate;  
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;  
  4. import org.springframework.context.ApplicationContext;  
  5. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  6. publicclass Send {  
  7.     publicstaticvoid main(String[] args) throws InterruptedException {  
  8.         ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-rabbitmq-async-send.xml");    
  9.         AmqpTemplate amqpTemplate = context.getBean(RabbitTemplate.class);    
  10.         for(int i=0;i<1000;i++){  
  11.             amqpTemplate.convertAndSend("test spring async=>"+i);   
  12.             Thread.sleep(3000);  
  13.         }  
  14.     }  
  15. }  

4.處理訊息類ReceiveMsgHandler.Java

[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. package cn.slimsmart.rabbitmq.demo.spring.async;  
  2. publicclass ReceiveMsgHandler {  
  3.     publicvoid handleMessage(String text) {  
  4.         System.out.println("Received: " + text);  
  5.     }  
  6. }  
5.配置applicationContext-rabbitmq-async-receive.xml: [html] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. <?xmlversion="1.0"encoding="UTF-8"?>
  2. <beansxmlns="http://www.springframework.org/schema/beans"
  3.        xmlns:context="http://www.springframework.org/schema/context"
  4.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5.        xsi:schemaLocation="  
  6.         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
  7.         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">
  8.     <beanclass="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  9.         <propertyname="systemPropertiesModeName"value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/>
  10.         <propertyname="ignoreResourceNotFound"value="true"/>
  11.         <propertyname="locations">
  12.             <list>
  13.                 <!-- 標準配置 -->
  14.                 <value>classpath*:/application.properties</value>
  15.             </list>
  16.         </property>
  17.     </bean>
  18.     <!-- 建立connectionFactory -->
  19.     <beanid="rabbitConnectionFactory"
  20.       class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
  21.         <constructor-argvalue="${rabbit.hosts}"/>
  22.         <propertyname="username"value="${rabbit.username}"/>
  23.         <propertyname="password"value="${rabbit.password}"/>
  24.         <propertyname="virtualHost"value="${rabbit.virtualHost}"/>
  25.         <propertyname="channelCacheSize"value="5"/>
  26.     </bean>
  27.     <!-- 宣告訊息轉換器為SimpleMessageConverter -->
  28.     <beanid="messageConverter"
  29.         class="org.springframework.amqp.support.converter.SimpleMessageConverter">
  30.     </bean>
  31.     <!-- 監聽生產者傳送的訊息開始 -->
  32.     <!-- 用於接收訊息的處理類 -->
  33.     <beanid="receiveHandler"
  34.         class="cn.slimsmart.rabbitmq.demo.spring.async.ReceiveMsgHandler">
  35.     </bean>
  36.     <!-- 用於訊息的監聽的代理類MessageListenerAdapter -->
  37.     <beanid="receiveListenerAdapter"
  38.         class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
  39.         <constructor-argref="receiveHandler"/>
  40.         <propertyname="defaultListenerMethod"value="handleMessage"></property>
  41.         <propertyname="messageConverter"ref="messageConverter"></property>
  42.     </bean>
  43.     <!-- 用於訊息的監聽的容器類SimpleMessageListenerContainer,對於queueName的值一定要與定義的Queue的值相同 -->
  44.     <beanid="listenerContainer"
  45.         class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
  46.         <propertyname="queueNames"value="${rabbit.queue}"></property>
  47.         <propertyname="connectionFactory"ref="rabbitConnectionFactory"></property>
  48.         <propertyname="messageListener"ref="receiveListenerAdapter"></property>
  49.     </bean>
  50.     <!-- 監聽生產者傳送的訊息結束 -->
  51. </beans>
5.接收訊息啟動類Receive.java [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. package cn.slimsmart.rabbitmq.demo.spring.async;  
  2. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  3. publicclass Receive {  
  4.     publicstaticvoid main(String[] args) {  
  5.          new ClassPathXmlApplicationContext("applicationContext-rabbitmq-async-receive.xml");  
  6.     }  
  7. }  
啟動接收訊息,再發送訊息 [plain] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. Received: test spring async=>0  
  2. Received: test spring async=>1  
  3. Received: test spring async=>2  
  4. Received: test spring async=>3  
  5. Received: test spring async=>4  
  6. Received: test spring async=>5  
  7. Received: test spring async=>6  
  8. Received: test spring async=>7  
  9. ......  
若報如下錯誤,說明訊息佇列不存在,請在控制檯新增訊息佇列。 [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. log4j:WARN No appenders could be found for logger (org.springframework.core.env.StandardEnvironment).  
  2. log4j:WARN Please initialize the log4j system properly.  
  3. Exception in thread "main" org.springframework.context.ApplicationContextException: Failed to start bean 'listenerContainer'; nested exception is org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup  
  4.     at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:170)  
  5.     at org.springframework.context.support.DefaultLifecycleProcessor.access$1(DefaultLifecycleProcessor.java:154)  
  6.     at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:339)  
  7.     at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:143)  
  8.     at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:108)  
  9.     at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:931)  
  10.     at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:472)  
  11.     at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73)  
  12.     at cn.slimsmart.rabbitmq.demo.spring.async.Consumer.main(Consumer.java:7)  
  13. Caused by: org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup  
  14.     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:333)  
  15.     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:360)  
  16.     at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:167)  
  17.     ... 8 more  
  18. Caused by: org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.  
  19.     at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:228)  
  20.     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:516)  
  21.     at java.lang.Thread.run(Unknown Source)  
  22. Caused by: java.io.IOException  
  23.     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)  
  24.     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)  
  25.     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)  
  26.     at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:788)  
  27.     at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)  
  28.     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  
  29.     at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)  
  30.     at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)  
  31.     at java.lang.reflect.Method.invoke(Unknown Source)  
  32.     at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:348)  
  33.     at com.sun.proxy.$Proxy8.queueDeclarePassive(Unknown Source)  
  34.     at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:213)  
  35.     ... 2 more  
  36. Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'spring-queue-async' in vhost '/'class-id=50, method-id=10), null""}  
  37.     at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)  
  38.     at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)  
  39.     at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)  
  40.     at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)  
  41.     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)  
  42.     ... 11 more  
  43. Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'spring-queue-async' in vhost '/'class-id=50, method-id=10), null""}  
  44.     at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:473)  
  45.     at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:313)  
  46.     at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)  
  47.     at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)  
  48.     at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)  
控制檯新增佇列。