1. 程式人生 > >RabbitMQ訊息的持久化

RabbitMQ訊息的持久化

RabbitMQ訊息持久化需要將訊息和佇列都持久化

佇列持久化

//為Channel定義queue的屬性,queueName為queue名稱  第二個引數持久化標誌,為true表示持久化
channel.queueDeclare(queueName, true, false,false,null); 
訊息持久化
/** 
* 測試條件:1、在訊息佇列持久化的前提下2、接收訊息方設定接收方式為手動接收,並不對接收訊息進行確認 
* 不採用訊息持久化,重啟RabbitMQ服務後,訊息佇列存在,訊息接收不到 
 * 採用訊息持久化,重啟RabbitMQ服務後,訊息佇列存在,訊息依然可以接收到訊息,說明訊息被持久化 
*/  
 channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
傳送端
package cn.rabbitmq.disk;

import java.io.IOException;

import cn.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

public class SendTest {

private final static String EXCHANGE_NAME = "logs1";  
    
    public static void main(String[] args) throws IOException {  
        /** 
         * 建立連線連線到MabbitMQ 
         */  
        Connection connection = ConnectionUtil.getConnection();
        // 建立一個頻道  
        Channel channel = connection.createChannel();  
       
        String queueName = "queue01";  
        //為Channel定義queue的屬性,queueName為queue名稱  第二個引數持久化標誌,為true表示持久化
        channel.queueDeclare(queueName, true, false,false,null);  
        String msg = "Hello World!";  
        //傳送訊息  
        /** 
         * 測試條件:1、在訊息佇列持久化的前提下2、接收訊息方設定接收方式為手動接收,並不對接收訊息進行確認 
         * 不採用訊息持久化,重啟RabbitMQ服務後,訊息佇列存在,訊息接收不到 
         * 採用訊息持久化,重啟RabbitMQ服務後,訊息佇列存在,訊息依然可以接收到訊息,說明訊息被持久化 
         */  
        channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());  
//      channel.basicPublish("", queueName, null, msg.getBytes());  
        System.out.println("send message["+msg+"] to "+queueName+"success!");  
        //關閉通道  
        channel.close();  
        //關閉連線  
        connection.close();  
    }
}
接收端
package cn.rabbitmq.disk;

import cn.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class RecTest {
    public static void main(String[] args) throws Exception{  
        Connection connection = ConnectionUtil.getConnection();
        // 建立一個頻道  
        Channel channel = connection.createChannel();  
     
        String queueName = "queue01";  
        channel.queueDeclare(queueName, true, false, false, null);  
        //以上部分和sender一樣  
        //配置好獲取訊息得方式  
        QueueingConsumer consumer =  new QueueingConsumer(channel);  
        channel.basicConsume(queueName, false,consumer);  
        //迴圈獲取訊息  
        while(true){  
            //獲取訊息,如果沒有訊息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());  
            //確認訊息,已經收到    
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);    
            System.out.println("received message["+msg+"] from "+queueName);  
        }  
    } 
}