1. 程式人生 > >RabbitMQ整合Spring配置檔案詳解

RabbitMQ整合Spring配置檔案詳解

一、rabbitmq 配置檔案 在web 專案開發過程中,一般分為生產者配置檔案和消費者配置檔案。

一、準備工作

安裝好rabbitmq,並在專案中增加配置檔案 rabbit.properties 內容如下:

rmq.ip=192.188.113.114   
rmq.port=5672
rmq.producer.num=20
rmq.manager.user=admin
rmq.manager.password=admin

二、生產者配置檔案:producer.xml

  1. 配置連線類的bean
    <bean id="connectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="localhost" />    
        <property name="username" value="${rmq.manager.user}" />    
        <property name="password" value="${rmq.manager.password}" />   
        <property name="host" value="${rmq.ip}" />   
        <property name="port" value="${rmq.port}" />   
    </bean>   
  1. spring amqp預設的是jackson 的一個外掛,目的將生產者生產的資料轉換為json存入訊息佇列,由於fastjson的速度快於jackson,這裡替換為fastjson的一個實現,(可以替換成任意的MessageConvert實現)
<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
  1. spring template 宣告, durable:是否持久化 ; exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除;auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列.。也可以在配置時指明預設使用的exchange與routingkey、·
<rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" routing-key="xxx" exchange="xxx"/>
  1. 佇列宣告,使用name屬性或者id屬性指定佇列名。durable屬性設定是夠將訊息持久化,exclusive屬性設定訊息佇列是否是獨佔的
<rabbit:queue id="test_delay_queue" durable="true" auto-delete="false" exclusive="false" name="test_delay_queue">
		<rabbit:queue-arguments>
            <entry key="x-message-ttl">
                <value  type="java.lang.Long">60000</value>
            </entry>
            <entry key="x-dead-letter-exchange" value="test_Exchange"/>  
        </rabbit:queue-arguments>
	</rabbit:queue>
  1. 配置exchange

direct

<rabbit:direct-exchange name="test_Exchange" durable="true" auto-delete="false" id="test_Exchange">  
	    <rabbit:bindings>  
	        <rabbit:binding queue="test_queue" key="test_key" />  
	    </rabbit:bindings>
</rabbit:direct-exchange> 

fanout

<rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">  
	    <rabbit:bindings>  
	        <rabbit:binding queue="test_delay_queue"/>  
	    </rabbit:bindings>  
</rabbit:fanout-exchange>

topic

    <binding queue="testqueue" pattern="*.*.test1"

<rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange">
		<rabbit:bindings>
			<rabbit:binding queue="test_queue" pattern="test_key.*.*" />
		</rabbit:bindings>
</rabbit:topic-exchange>

在子標籤<rabbit:bindings>中宣告繫結鍵與佇列的對應關係

  1. 生產者(傳送端)程式碼:
@Resource  
private RabbitTemplate rabbitTemplate;  
public void sendMessage(CommonMessage msg){
    	 try {  
              logger.error("傳送資訊開始");
              System.out.println(rabbitTemplate.getConnectionFactory().getHost());  
             //傳送資訊  message-exchange 交換機 msg.getSource() 為 test_key 
             rabbitTemplate.convertAndSend("message-exchange",msg.getSource(), msg);
             logger.error("傳送資訊結束");
         } catch (Exception e) {  
         	e.printStackTrace();
         }
    }

rabbitTemplate傳送訊息時若不指定exchange和routingkey則使用配置時預設的。

三、消費者配置

  1. 宣告訊息佇列的服務端連線,ConnectionFactory的配置與生存者相同
<rabbit:admin connection-factory="newRabbitConnFactory" />
  1. 配置佇列的中訊息的處理類
	<bean id="refreshSingleStatisticJmsListener"
class="com.netease.edu.k12.server.course.manager.jms.listener.RefreshSingleStatisticJmsListener" />

類內容

@Component("refreshSingleStatisticJmsListener")
public class RefreshSingleStatisticJmsListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
               ...
    }
}

監聽類需要實現MessageListener介面並重寫onMessage方法,方法的入參Message中包含了訊息的載荷(一個位元組陣列,請根據需要進行反序列化)。

  1. 繫結監聽的佇列
	<rabbit:listener-container
			connection-factory="newRabbitConnFactory" prefetch="3" concurrency="50"
			acknowledge="auto">

		<rabbit:listener ref="refreshSingleStatisticJmsListener"
						 queues="refreshSingleStatistic-course-1.0.0${local_service_version_suffix}" />

        <rabbit:listener ref="syncYktCourseEnrollJmsListener"
                         queues="ykt-courseEnrollSyncJmsQueue-key-${dubbo_study_version}"/>
	</rabbit:listener-container>

<rabbit:listener>子標籤的ref屬性指定監聽處理類,queues指定被監聽的佇列。

這樣當監聽的佇列中有訊息時,訊息就會被取出交給監聽訊息的佇列進行相應操作。