1. 程式人生 > >RabbitMQ 配置檔案詳解(生產者和消費者)

RabbitMQ 配置檔案詳解(生產者和消費者)

一、rabbitmq 配置檔案 在web 專案開發過程中,一般分為生產者配置檔案和消費者配置檔案。廢話少說,馬上教您整個流程的配置!

1、準備工作:安裝好rabbitmq,並在專案中增加配置檔案   rabbit.properties 內容如下:

rmq.ip=192.188.113.114   
rmq.port=5672
rmq.producer.num=20
rmq.manager.user=admin
rmq.manager.password=admin

二、生產者配置檔案:producer.xml

1、<!-- 建立連線類 連線安裝好的 rabbitmq -->

  
    <bean id="connectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="localhost" />    
        <property name="username" value="${rmq.manager.user}" />    
        <property name="password" value="${rmq.manager.password}" />   
        <property name="host" value="${rmq.ip}" />   
        <property name="port" value="${rmq.port}" />   
    </bean>   

2、spring amqp預設的是jackson 的一個外掛,目的將生產者生產的資料轉換為json存入訊息佇列,由於fastjson的速度快於jackson,這裡替換為fastjson的一個實現

<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>

3、 spring template   宣告, durable:是否持久化 ; exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除;auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列.

       queue 佇列宣告 需要傳送訊息到哪些佇列 訊息系統監聽佇列

<rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
    <rabbit:queue id="test_queue" durable="true" auto-delete="false" exclusive="false" name="test_queue" />

4、訊息失效後監聽佇列,6000 為 時間間隔資訊 60s   int或long型別,解決優先順序問題

<rabbit:queue id="test_delay_queue" durable="true" auto-delete="false" exclusive="false" name="test_delay_queue">
		<rabbit:queue-arguments>
            <entry key="x-message-ttl">
                <value  type="java.lang.Long">60000</value>
            </entry>
            <entry key="x-dead-letter-exchange" value="test_Exchange"/>  
        </rabbit:queue-arguments>
	</rabbit:queue>

5、rabbitmq的三種模式:direct,fanout,topic 三種

direct 訊息轉換佇列 繫結key,意思就是訊息與一個特定的路由鍵匹配,會轉發。rabbit:binding:設定訊息queue匹配的key。

<rabbit:direct-exchange name="test_Exchange" durable="true" auto-delete="false" id="test_Exchange">  
	    <rabbit:bindings>  
	        <rabbit:binding queue="test_queue" key="test_key" />  
	    </rabbit:bindings>
</rabbit:direct-exchange> 

fanout 模式:客戶端中只要是與該路由繫結在一起的佇列都會收到相關訊息,這類似廣播,傳送端不管佇列是誰,都由客戶端自己去繫結,誰需要資料誰去繫結自己的相應佇列。

<rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">  
	    <rabbit:bindings>  
	        <rabbit:binding queue="test_delay_queue"/>  
	    </rabbit:bindings>  
</rabbit:fanout-exchange>

topic 模式:傳送端不是按固定的routing key傳送訊息,而是按字串“匹配”傳送,接收端同樣如此。

    <binding queue="testqueue" pattern="*.*.test1"

<rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange">
		<rabbit:bindings>
			<rabbit:binding queue="test_queue" pattern="test_key.*.*" />
		</rabbit:bindings>
</rabbit:topic-exchange>

6、生產者(傳送端)程式碼:

@Resource  
private RabbitTemplate rabbitTemplate;  
public void sendMessage(CommonMessage msg){
    	 try {  
              logger.error("傳送資訊開始");
              System.out.println(rabbitTemplate.getConnectionFactory().getHost());  
             //傳送資訊  message-exchange 交換機 msg.getSource() 為 test_key 
             rabbitTemplate.convertAndSend("message-exchange",msg.getSource(), msg);
             logger.error("傳送資訊結束");
         } catch (Exception e) {  
         	e.printStackTrace();
         }
    }

三、消費者配置:同一個專案中 consumer.xml

1、連線服務配置

<rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" username="${rmq.manager.user}" password="${rmq.manager.password}" port="${rmq.port}" />
	<rabbit:admin connection-factory="connectionFactory" />

2、spring amqp預設的是jackson 的一個外掛,目的將生產者生產的資料轉換為json存入訊息佇列,由於fastjson的速度快於jackson,這裡替換為fastjson的一個實現

<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>

3、自定義介面類

<bean id="testHandler" class="com.rabbit.TestHandler"></bean>

4、queue 佇列宣告 需要傳送訊息到哪些佇列 ,訊息系統監聽佇列

<rabbit:queue id="test_queue" durable="true" auto-delete="false" exclusive="false" name="test_queue" />

5、topic 模式 繫結。注意:此處僅寫一種模式;根據需求可以配多種模式。

<rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange">
		<rabbit:bindings>
			<rabbit:binding queue="test_queue" pattern="test_key" />
		</rabbit:bindings>
</rabbit:topic-exchange>

6、用於訊息的監聽的代理類MessageListenerAdapter

<bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" >
		<constructor-arg ref="testHandler" />//類名
		<property name="defaultListenerMethod" value="handlerTest"></property>//方法名
		<property name="messageConverter" ref="jsonMessageConverter"></property>
</bean>

7、配置監聽  acknowledeg = "manual"   設定手動應答  當訊息處理失敗時:會一直重發  直到訊息處理成功,監聽容器

acknowledge="auto" concurrency="30"  設定傳送次數,最多傳送30次

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20">
		<rabbit:listener queues="test_queue" ref="testQueueListenerAdapter" />
</rabbit:listener-container>

8、消費端程式碼:TestHandler 類

public class TestHandler  {
    @Override
    public void handlerTest(CommonMessage commonMessage) {
        System.out.println("DetailQueueConsumer: " + new String(message.getBody()));
    }
}

總結:這是rabbitmq 從生產者到消費者 的整個流程配置,能夠直接在web開發中使用,博主親測使用;如果還有不明白的小夥伴,可以留言,博主定會詳細回覆!