1. 程式人生 > >RabbitMQ之佇列與訊息持久化

RabbitMQ之佇列與訊息持久化

佇列持久化

在之前的例子中,我們所用的佇列都是臨時佇列,當服務重啟後之前建立的佇列就都沒有了。

佇列的持久化是在定義佇列時的第二個引數決定的(false為佇列不用持久化)

  1. channel.queueDeclare(queueName, falsefalse,false,null);  
     如果持久化標誌設定為true,則代表是一個持久的佇列,那麼在服務重啟後,也會存在。因為服務會把持久化的queue存放在硬碟上,當服務重啟的時候,會重新申明之前被持久化的queue。佇列是可以被持久化,但是裡面的訊息是否為持久化那還要看訊息的持久化設定。也就是說,如果重啟之前那個queue裡面還有沒有發出去的訊息的話,重啟之後那佇列裡面是不是還存在原來的訊息,這個就要取決於傳送者在傳送訊息時對訊息的設定了。

訊息持久化

如果要在重啟後保持訊息的持久化必須設定訊息是持久化的標誌
  1. channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());  

示例程式碼

傳送端:
  1. import java.io.IOException;  
  2. import com.rabbitmq.client.Channel;  
  3. import com.rabbitmq.client.Connection;  
  4. import com.rabbitmq.client.ConnectionFactory;  
  5. import com.rabbitmq.client.MessageProperties;  
  6. /** 
  7.  * 訊息持久化 
  8.  * @author menghh 
  9.  * 
  10.  */
  11. publicclass Send04 {  
  12.     publicstaticvoid main(String[] args) throws IOException {  
  13.         ConnectionFactory factory = new ConnectionFactory();  
  14.         //RabbitMQ-Server安裝在本機,所以直接用127.0.0.1
  15.         factory.setHost("127.0.0.1"
    );  
  16.         //建立一個連線
  17.         Connection conn = factory.newConnection();  
  18.         //建立一個通訊通道
  19.         Channel channel = conn.createChannel();  
  20.         //定義Queue名稱
  21.         String queueName = "queue01";  
  22.         //為Channel定義queue的屬性,queueName為queue名稱
  23.         channel.queueDeclare(queueName, truefalse,false,null);  
  24.         String msg = "Hello World!";  
  25.         //傳送訊息
  26.         /** 
  27.          * 測試條件:1、在訊息佇列持久化的前提下2、接收訊息方設定接收方式為手動接收,並不對接收訊息進行確認 
  28.          * 不採用訊息持久化,重啟RabbitMQ服務後,訊息佇列存在,訊息接收不到 
  29.          * 採用訊息持久化,重啟RabbitMQ服務後,訊息佇列存在,訊息依然可以接收到訊息,說明訊息被持久化 
  30.          */
  31.         channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());  
  32. //      channel.basicPublish("", queueName, null, msg.getBytes());
  33.         System.out.println("send message["+msg+"] to "+queueName+"success!");  
  34.         //關閉通道
  35.         channel.close();  
  36.         //關閉連線
  37.         conn.close();  
  38.     }  
  39. }  


接收端(跟之前程式碼一樣)
  1. import com.rabbitmq.client.Channel;  
  2. import com.rabbitmq.client.Connection;  
  3. import com.rabbitmq.client.ConnectionFactory;  
  4. import com.rabbitmq.client.QueueingConsumer;  
  5. import com.rabbitmq.client.QueueingConsumer.Delivery;  
  6. /** 
  7.  * 訊息持久化 
  8.  * @author menghh 
  9.  * 
  10.  */
  11. publicclass Recv04 {  
  12.     publicstaticvoid main(String[] args) throws Exception{  
  13.         ConnectionFactory factory = new ConnectionFactory();   
  14.         factory.setHost("127.0.0.1");  
  15.         Connection conn = factory.newConnection();  
  16.         Channel channel = conn.createChannel();  
  17.         String queueName = "queue01";  
  18.         channel.queueDeclare(queueName, truefalsefalsenull);  
  19.         //以上部分和sender一樣
  20.         //配置好獲取訊息得方式
  21.         QueueingConsumer consumer =  new QueueingConsumer(channel);  
  22.         channel.basicConsume(queueName, false,consumer);  
  23.         //迴圈獲取訊息
  24.         while(true){  
  25.             //獲取訊息,如果沒有訊息,這一步將會一直阻塞
  26.             Delivery delivery = consumer.nextDelivery();  
  27.             String msg = new String(delivery.getBody());  
  28.             //確認訊息,已經收到  
  29.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);    
  30.             System.out.println("received message["+msg+"] from "+queueName);  
  31.         }  
  32.     }  
  33. }  

測試結果:

執行程式後,佇列存在,重啟RabbitMQ Server後佇列依然存在

訊息持久化的測試方法: 把消費者中確認接收訊息的程式碼註釋掉(前邊提到過該操作),啟動傳送訊息程式,重啟RabbitMQ Server後訊息依然可以接收到