RabbitMQ使用場景練習:訊息確認機制(十一)
阿新 • • 發佈:2019-01-03
- 訊息確認機制
1、從實驗來看,訊息的確認機制只是確認publisher傳送訊息到broker,由broker進行應答,不能確認訊息是否有效消費。
2、而為了確認訊息是否被髮送給queue,應該在傳送訊息中啟用引數mandatory=true,使用ReturnListener接收未被髮送成功的訊息。
3、接下來就需要確認訊息是否被有效消費。publisher端目前並沒有提供監聽事件,但提供了應答機制來保證訊息被成功消費,應答方式:
basicAck:成功消費,訊息從佇列中刪除
basicNack:requeue=true,訊息重新進入佇列,false被刪除
basicReject:等同於basicNack
basicRecover:訊息重入佇列,requeue=true,傳送給新的consumer,false傳送給相同的consumer
- 應答模式之transaction機制
需要手動提交和回滾,執行txCommit,訊息才會轉發給佇列進入ready狀態;執行txRollback,訊息被取消
package com.demo.mq.rabbitmq.example11; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang3.SerializationUtils; import com.demo.mq.rabbitmq.MqManager; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 應答模式之transaction機制 * @author sheungxin * */ public class TxDemo { private static String exchange_name=""; private static String queue_name="tx_queue"; /** * transaction機制傳送訊息,事務機制:手動提交和回滾 * 執行txCommit,訊息才會轉發給佇列進入ready狀態 * 執行txRollback,訊息被取消 * @param mes * @throws Exception */ public static void txSend(Serializable mes) throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); //開啟transaction機制 channel.txSelect(); channel.queueDeclare(queue_name,false,false,true,null); for(int i=0;i<10;i++){ try{ channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString()+i)); //do something // int n=5/0;//試驗訊息回滾 channel.txCommit();//提交訊息 System.out.println("釋出訊息"+mes.toString()+i); }catch(Exception e){ channel.txRollback();//異常,取消訊息 System.out.println("回滾訊息"+mes.toString()+i); } } } /** * transaction機制接收訊息,事務機制:手動提交和回滾 * 消費者需要執行basicAck,並txCommit(自動應答模式自動處理,本例中採用手動應答模式) * @throws Exception */ public static void txRecv() throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); //開啟transaction機制 channel.txSelect(); channel.queueDeclare(queue_name,false,false,true,null); //關閉自動應答模式(自動應答模式不需要ack、txCommit),需要手動basicAck,並執行txCommit channel.basicConsume(queue_name, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{ String mes=SerializationUtils.deserialize(body); System.out.println("tx Received :'"+mes+"' done"); channel.basicAck(envelope.getDeliveryTag(), false); channel.txCommit(); } }); } public static void main(String[] args) throws Exception { txSend("hello world!"); txRecv(); } }
- 應答模式之confirm機制
2、confirmSelect,進入confirm訊息確認模式,確認方式:1、非同步ConfirmListener;2、同步waitForConfirms
3、ConfirmListener、waitForConfirms均需要配合confirm機制使用
4、暫時未弄明白confirm機制在consumer的應用,ConfirmListener在consumer中無效
5、basicNack、basicReject:引數requeue=true時,訊息會重新進入佇列
6、autoDelete佇列在消費者關閉後不管是否還有未處理的訊息都會關閉掉
package com.demo.mq.rabbitmq.example11;
import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
/**
* 應答模式之confirm機制:訊息傳送
* @author sheungxin
*
*/
public class ConfirmSend {
private static String exchange_name="";
private static String queue_name="tx_queue";
/**
* confirm機制:確認publisher傳送訊息到broker,由broker進行應答(不能確認是否被有效消費)
* confirmSelect,進入confirm訊息確認模式,確認方式:1、非同步ConfirmListener;2、同步waitForConfirms
* ConfirmListener、waitForConfirms均需要配合confirm機制使用
* @param mes
* @throws Exception
*/
public static void txSend(Serializable mes) throws Exception{
Connection conn=MqManager.newConnection();
Channel channel=conn.createChannel();
//開啟transaction機制
channel.confirmSelect();
channel.queueDeclare(queue_name,false,false,true,null);
//非同步實現傳送訊息的確認(此部分的訊息確認是指傳送訊息到佇列,並非確認訊息的有效消費)
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple)
throws IOException {
//multiple:測試發現multiple隨機true或false,原因未知
System.out.println("Nack deliveryTag:"+deliveryTag+",multiple:"+multiple);
}
@Override
public void handleAck(long deliveryTag, boolean multiple)
throws IOException {
System.out.println("Ack deliveryTag:"+deliveryTag+",multiple:"+multiple);
}
});
for(int i=0;i<10;i++){
channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString()+i));
}
// channel.waitForConfirms();//同步實現傳送訊息的確認
System.out.println("-----------");
channel.close();
conn.close();
}
public static void main(String[] args) throws Exception {
txSend("hello world!");
}
}
package com.demo.mq.rabbitmq.example11;
import java.io.IOException;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 應答模式之confirm機制:訊息接收
* @author sheungxin
*
*/
public class ConfirmRecv {
private static String queue_name="tx_queue";
/**
* confirm機制:暫時未弄明白confirm機制在consumer的應用,ConfirmListener在consumer中無效
* basicNack、basicReject:引數requeue=true時,訊息會重新進入佇列
* autoDelete佇列在消費者關閉後不管是否還有未處理的訊息都會關閉掉
* @throws Exception
*/
public static void txRecv() throws Exception{
Connection conn=MqManager.newConnection();
Channel channel=conn.createChannel();
//開啟transaction機制
// channel.confirmSelect();
//autoDelete,true只要被訊息
channel.queueDeclare(queue_name,false,false,true,null);
//關閉自動應答模式
channel.basicConsume(queue_name, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
String mes=SerializationUtils.deserialize(body);
//multiple批量提交,true提交小於引數中tag訊息
long n=envelope.getDeliveryTag()%3;
if(n==0){
channel.basicAck(envelope.getDeliveryTag(), false);
}else if(n==1){
//requeue,true重新進入佇列
channel.basicNack(envelope.getDeliveryTag(), false, true);
}else{
//requeue,true重新進入佇列,與basicNack差異缺少multiple引數
channel.basicReject(envelope.getDeliveryTag(), true);
}
try {
Thread.sleep(2*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println((n==0?"Ack":n==1?"Nack":"Reject")+" mes :'"+mes+"' done");
}
});
}
public static void main(String[] args) throws Exception {
txRecv();
}
}
http://blog.csdn.net/azhegps/article/details/53815201