RabbitMQ 配置檔案詳解(生產者和消費者)
一、rabbitmq 配置檔案 在web 專案開發過程中,一般分為生產者配置檔案和消費者配置檔案。廢話少說,馬上教您整個流程的配置!
1、準備工作:安裝好rabbitmq,並在專案中增加配置檔案 rabbit.properties 內容如下:
rmq.ip=192.188.113.114
rmq.port=5672
rmq.producer.num=20
rmq.manager.user=admin
rmq.manager.password=admin
二、生產者配置檔案:producer.xml
1、<!-- 建立連線類 連線安裝好的 rabbitmq -->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="localhost" /> <property name="username" value="${rmq.manager.user}" /> <property name="password" value="${rmq.manager.password}" /> <property name="host" value="${rmq.ip}" /> <property name="port" value="${rmq.port}" /> </bean>
2、spring amqp預設的是jackson 的一個外掛,目的將生產者生產的資料轉換為json存入訊息佇列,由於fastjson的速度快於jackson,這裡替換為fastjson的一個實現
<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
3、 spring template 宣告, durable:是否持久化 ; exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除;auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列.
queue 佇列宣告 需要傳送訊息到哪些佇列 訊息系統監聽佇列
<rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> <rabbit:queue id="test_queue" durable="true" auto-delete="false" exclusive="false" name="test_queue" />
4、訊息失效後監聽佇列,6000 為 時間間隔資訊 60s int或long型別,解決優先順序問題
<rabbit:queue id="test_delay_queue" durable="true" auto-delete="false" exclusive="false" name="test_delay_queue">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">60000</value>
</entry>
<entry key="x-dead-letter-exchange" value="test_Exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
5、rabbitmq的三種模式:direct,fanout,topic 三種
direct 訊息轉換佇列 繫結key,意思就是訊息與一個特定的路由鍵匹配,會轉發。rabbit:binding:設定訊息queue匹配的key。
<rabbit:direct-exchange name="test_Exchange" durable="true" auto-delete="false" id="test_Exchange">
<rabbit:bindings>
<rabbit:binding queue="test_queue" key="test_key" />
</rabbit:bindings>
</rabbit:direct-exchange>
fanout 模式:客戶端中只要是與該路由繫結在一起的佇列都會收到相關訊息,這類似廣播,傳送端不管佇列是誰,都由客戶端自己去繫結,誰需要資料誰去繫結自己的相應佇列。
<rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">
<rabbit:bindings>
<rabbit:binding queue="test_delay_queue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
topic 模式:傳送端不是按固定的routing key傳送訊息,而是按字串“匹配”傳送,接收端同樣如此。
<binding queue="testqueue" pattern="*.*.test1"
<rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange">
<rabbit:bindings>
<rabbit:binding queue="test_queue" pattern="test_key.*.*" />
</rabbit:bindings>
</rabbit:topic-exchange>
6、生產者(傳送端)程式碼:
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMessage(CommonMessage msg){
try {
logger.error("傳送資訊開始");
System.out.println(rabbitTemplate.getConnectionFactory().getHost());
//傳送資訊 message-exchange 交換機 msg.getSource() 為 test_key
rabbitTemplate.convertAndSend("message-exchange",msg.getSource(), msg);
logger.error("傳送資訊結束");
} catch (Exception e) {
e.printStackTrace();
}
}
三、消費者配置:同一個專案中 consumer.xml
1、連線服務配置
<rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" username="${rmq.manager.user}" password="${rmq.manager.password}" port="${rmq.port}" />
<rabbit:admin connection-factory="connectionFactory" />
2、spring amqp預設的是jackson 的一個外掛,目的將生產者生產的資料轉換為json存入訊息佇列,由於fastjson的速度快於jackson,這裡替換為fastjson的一個實現
<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
3、自定義介面類
<bean id="testHandler" class="com.rabbit.TestHandler"></bean>
4、queue 佇列宣告 需要傳送訊息到哪些佇列 ,訊息系統監聽佇列
<rabbit:queue id="test_queue" durable="true" auto-delete="false" exclusive="false" name="test_queue" />
5、topic 模式 繫結。注意:此處僅寫一種模式;根據需求可以配多種模式。
<rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange">
<rabbit:bindings>
<rabbit:binding queue="test_queue" pattern="test_key" />
</rabbit:bindings>
</rabbit:topic-exchange>
6、用於訊息的監聽的代理類MessageListenerAdapter
<bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" >
<constructor-arg ref="testHandler" />//類名
<property name="defaultListenerMethod" value="handlerTest"></property>//方法名
<property name="messageConverter" ref="jsonMessageConverter"></property>
</bean>
7、配置監聽 acknowledeg = "manual" 設定手動應答 當訊息處理失敗時:會一直重發 直到訊息處理成功,監聽容器
acknowledge="auto" concurrency="30" 設定傳送次數,最多傳送30次
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20">
<rabbit:listener queues="test_queue" ref="testQueueListenerAdapter" />
</rabbit:listener-container>
8、消費端程式碼:TestHandler 類
public class TestHandler {
@Override
public void handlerTest(CommonMessage commonMessage) {
System.out.println("DetailQueueConsumer: " + new String(message.getBody()));
}
}
總結:這是rabbitmq 從生產者到消費者 的整個流程配置,能夠直接在web開發中使用,博主親測使用;如果還有不明白的小夥伴,可以留言,博主定會詳細回覆!