1. 程式人生 > >RabbitMQ使用場景練習:訊息確認機制(十一)

RabbitMQ使用場景練習:訊息確認機制(十一)

  • 訊息確認機制
RabbitMQ提供了transaction、confirm兩種訊息確認機制。transaction即事務機制,手動提交和回滾;confirm機制提供了Confirmlistener和waitForConfirms兩種方式。confirm機制效率明顯會高於transaction機制,但後者的優勢在於強一致性。如果沒有特別的要求,建議使用conrim機制。 

1、從實驗來看,訊息的確認機制只是確認publisher傳送訊息到broker,由broker進行應答,不能確認訊息是否有效消費。 
2、而為了確認訊息是否被髮送給queue,應該在傳送訊息中啟用引數mandatory=true,使用ReturnListener接收未被髮送成功的訊息。 

3、接下來就需要確認訊息是否被有效消費。publisher端目前並沒有提供監聽事件,但提供了應答機制來保證訊息被成功消費,應答方式: 
   basicAck:成功消費,訊息從佇列中刪除 
   basicNack:requeue=true,訊息重新進入佇列,false被刪除 
   basicReject:等同於basicNack 
   basicRecover:訊息重入佇列,requeue=true,傳送給新的consumer,false傳送給相同的consumer 

  • 應答模式之transaction機制

需要手動提交和回滾,執行txCommit,訊息才會轉發給佇列進入ready狀態;執行txRollback,訊息被取消 

package com.demo.mq.rabbitmq.example11;  
import java.io.IOException;  
import java.io.Serializable;  
import org.apache.commons.lang3.SerializationUtils;  
import com.demo.mq.rabbitmq.MqManager;  
import com.rabbitmq.client.AMQP;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.DefaultConsumer;  
import com.rabbitmq.client.Envelope;  
  
/** 
 * 應答模式之transaction機制 
 * @author sheungxin 
 * 
 */  
public class TxDemo {  
    private static String exchange_name="";  
    private static String queue_name="tx_queue";  
      
    /** 
     * transaction機制傳送訊息,事務機制:手動提交和回滾 
     * 執行txCommit,訊息才會轉發給佇列進入ready狀態 
     * 執行txRollback,訊息被取消 
     * @param mes 
     * @throws Exception 
     */  
    public static void txSend(Serializable mes) throws Exception{  
        Connection conn=MqManager.newConnection();  
        Channel channel=conn.createChannel();  
        //開啟transaction機制  
        channel.txSelect();  
        channel.queueDeclare(queue_name,false,false,true,null);  
        for(int i=0;i<10;i++){  
            try{  
                channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString()+i));  
                //do something  
//              int n=5/0;//試驗訊息回滾  
                channel.txCommit();//提交訊息  
                System.out.println("釋出訊息"+mes.toString()+i);  
            }catch(Exception e){  
                channel.txRollback();//異常,取消訊息  
                System.out.println("回滾訊息"+mes.toString()+i);  
            }  
        }  
    }  
  
    /** 
     * transaction機制接收訊息,事務機制:手動提交和回滾 
     * 消費者需要執行basicAck,並txCommit(自動應答模式自動處理,本例中採用手動應答模式) 
     * @throws Exception 
     */  
    public static void txRecv() throws Exception{  
        Connection conn=MqManager.newConnection();  
        Channel channel=conn.createChannel();  
        //開啟transaction機制  
        channel.txSelect();  
        channel.queueDeclare(queue_name,false,false,true,null);  
        //關閉自動應答模式(自動應答模式不需要ack、txCommit),需要手動basicAck,並執行txCommit  
        channel.basicConsume(queue_name, false, new DefaultConsumer(channel){  
            @Override  
            public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{  
                String mes=SerializationUtils.deserialize(body);  
                System.out.println("tx Received :'"+mes+"' done");  
                channel.basicAck(envelope.getDeliveryTag(), false);  
                channel.txCommit();  
                  
            }  
        });  
    }  
      
    public static void main(String[] args) throws Exception {  
        txSend("hello world!");  
        txRecv();  
    }  
}  
  • 應答模式之confirm機制
1、確認publisher傳送訊息到broker,由broker進行應答(不能確認是否被有效消費) 
2、confirmSelect,進入confirm訊息確認模式,確認方式:1、非同步ConfirmListener;2、同步waitForConfirms 
3、ConfirmListener、waitForConfirms均需要配合confirm機制使用 
4、暫時未弄明白confirm機制在consumer的應用,ConfirmListener在consumer中無效
5、basicNack、basicReject:引數requeue=true時,訊息會重新進入佇列 
6、autoDelete佇列在消費者關閉後不管是否還有未處理的訊息都會關閉掉
package com.demo.mq.rabbitmq.example11;  
import java.io.IOException;  
import java.io.Serializable;  
import org.apache.commons.lang3.SerializationUtils;  
import com.demo.mq.rabbitmq.MqManager;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.ConfirmListener;  
import com.rabbitmq.client.Connection;  
  
/** 
 * 應答模式之confirm機制:訊息傳送 
 * @author sheungxin 
 * 
 */  
public class ConfirmSend {  
    private static String exchange_name="";  
    private static String queue_name="tx_queue";  
      
    /** 
     * confirm機制:確認publisher傳送訊息到broker,由broker進行應答(不能確認是否被有效消費) 
     * confirmSelect,進入confirm訊息確認模式,確認方式:1、非同步ConfirmListener;2、同步waitForConfirms 
     * ConfirmListener、waitForConfirms均需要配合confirm機制使用 
     * @param mes 
     * @throws Exception 
     */  
    public static void txSend(Serializable mes) throws Exception{  
        Connection conn=MqManager.newConnection();  
        Channel channel=conn.createChannel();  
        //開啟transaction機制  
        channel.confirmSelect();  
        channel.queueDeclare(queue_name,false,false,true,null);  
        //非同步實現傳送訊息的確認(此部分的訊息確認是指傳送訊息到佇列,並非確認訊息的有效消費)  
        channel.addConfirmListener(new ConfirmListener() {  
              
            @Override  
            public void handleNack(long deliveryTag, boolean multiple)  
                    throws IOException {  
                //multiple:測試發現multiple隨機true或false,原因未知  
                System.out.println("Nack deliveryTag:"+deliveryTag+",multiple:"+multiple);  
            }  
              
            @Override  
            public void handleAck(long deliveryTag, boolean multiple)  
                    throws IOException {  
                System.out.println("Ack deliveryTag:"+deliveryTag+",multiple:"+multiple);  
            }  
        });  
        for(int i=0;i<10;i++){  
            channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString()+i));  
        }  
//      channel.waitForConfirms();//同步實現傳送訊息的確認  
        System.out.println("-----------");  
        channel.close();  
        conn.close();  
    }  
      
    public static void main(String[] args) throws Exception {  
        txSend("hello world!");  
    }  
}  
package com.demo.mq.rabbitmq.example11;  
import java.io.IOException;  
import org.apache.commons.lang3.SerializationUtils;  
import com.demo.mq.rabbitmq.MqManager;  
import com.rabbitmq.client.AMQP;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.DefaultConsumer;  
import com.rabbitmq.client.Envelope;  
  
/** 
 * 應答模式之confirm機制:訊息接收 
 * @author sheungxin 
 * 
 */  
public class ConfirmRecv {  
    private static String queue_name="tx_queue";  
  
    /** 
     * confirm機制:暫時未弄明白confirm機制在consumer的應用,ConfirmListener在consumer中無效 
     * basicNack、basicReject:引數requeue=true時,訊息會重新進入佇列 
     * autoDelete佇列在消費者關閉後不管是否還有未處理的訊息都會關閉掉 
     * @throws Exception 
     */  
    public static void txRecv() throws Exception{  
        Connection conn=MqManager.newConnection();  
        Channel channel=conn.createChannel();  
        //開啟transaction機制  
//      channel.confirmSelect();  
        //autoDelete,true只要被訊息  
        channel.queueDeclare(queue_name,false,false,true,null);  
        //關閉自動應答模式  
        channel.basicConsume(queue_name, false, new DefaultConsumer(channel){  
            @Override  
            public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{  
                String mes=SerializationUtils.deserialize(body);  
                //multiple批量提交,true提交小於引數中tag訊息  
                long n=envelope.getDeliveryTag()%3;  
                if(n==0){  
                    channel.basicAck(envelope.getDeliveryTag(), false);  
                }else if(n==1){  
                    //requeue,true重新進入佇列  
                    channel.basicNack(envelope.getDeliveryTag(), false, true);  
                }else{  
                    //requeue,true重新進入佇列,與basicNack差異缺少multiple引數  
                    channel.basicReject(envelope.getDeliveryTag(), true);  
                }  
                try {  
                    Thread.sleep(2*1000);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
                System.out.println((n==0?"Ack":n==1?"Nack":"Reject")+" mes :'"+mes+"' done");  
            }  
        });  
    }  
      
    public static void main(String[] args) throws Exception {  
        txRecv();  
    }  
}  

http://blog.csdn.net/azhegps/article/details/53815201