【RabbitMq】rabbitMq訊息確認機制
阿新 • • 發佈:2018-11-13
一、提出問題
生產者將訊息傳送出去後,訊息是否到達RabbitMq伺服器呢?預設的情況下,是不知道的
二、引入訊息確認機制
兩種方式:
1.AMQP實現事務機制
2.confirm模式
三、AMQP實現事務機制
3.1 簡單示例
txSelect():使用者將當前channel設定成transaction
txCommit():提交事務
txRollback():回滾事務
package com.wj.transation.config; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @建立人 wj * @建立時間 2018/11/2 * @描述 */ public class MqConfig { public static Connection getConnection() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection= factory.newConnection(); return connection; } }
生產者:
package com.wj.transation; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wj.transation.config.MqConfig; /** * @建立人 wj * @建立時間 2018/11/5 * @描述 用事務模式,進行訊息確認機制 */ public class TxSend { private static final String QUEUE_NAME = "test_queue_tx"; public static void send() throws Exception { Connection connection = MqConfig.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello tx!"; try { // 使用者將當前channel設定成transaction channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); // 異常,使回滾 int x=1/0; System.out.println("【傳送端】訊息:"+msg); //提交事務 channel.txCommit(); } catch (Exception e) { channel.txRollback(); System.out.println("發生異常,回滾"); } channel.close(); connection.close(); } }
消費者
package com.wj.transation; import com.rabbitmq.client.*; import com.wj.transation.config.MqConfig; import java.io.IOException; /** * @建立人 wj * @建立時間 2018/11/5 * @描述 */ public class TxRecv { private static final String QUEUE_NAME = "test_queue_tx"; public static void recv() throws Exception { Connection connection = MqConfig.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("[接收端]訊息:" + new String(body, "utf-8")); } }); } }
執行結果:
註釋異常,執行結果:
3.2 缺點
使用事務機制的話會降低RabbitMQ的效能,降低了吞吐量,增加了channel的請求數,耗時
四、Confirm模式
confirm模式分為三種:(差異在於生產者,前面二者序列)
普通模式(一個)
批處理 (多個)
非同步confirm模式
4.1普通模式
package com.wj.confirm.general;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wj.confirm.util.ConnectionUtil;
/**
* @建立人 wj
* @建立時間 2018/11/5
* @描述 普通模式
*/
public class Send {
private static final String QUEUE_NAME="general_queue";
public static void send() throws Exception {
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//生產者呼叫confirmSelect 將channel設定為confirm模式
channel.confirmSelect();
String msg="hello confirm!";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
if (!channel.waitForConfirms()){
System.out.println("訊息傳送失敗!");
}else {
System.out.println("訊息傳送成功!");
}
}
}
4.2批量模式
package com.wj.confirm.batch;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wj.confirm.util.ConnectionUtil;
/**
* @建立人 wj
* @建立時間 2018/11/5
* @描述 批量模式
*/
public class Send {
private static final String QUEUE_NAME = "batch_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//生產者呼叫confirmSelect 將channel設定為confirm模式
channel.confirmSelect();
// 批量
for (int i = 0; i < 10; i++) {
String msg = "hello confirm! [" + i + "]";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
//確認
if (!channel.waitForConfirms()) {
System.out.println("訊息傳送失敗!");
} else {
System.out.println("訊息傳送成功!");
}
}
}
4.3非同步confirm模式
package com.wj.confirm.asynchronization;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.wj.confirm.util.ConnectionUtil;
import java.io.IOException;
import java.util.*;
/**
* @建立人 wj
* @建立時間 2018/11/5
* @描述 批量模式
*/
public class Send {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//生產者呼叫confirmSelect 將channel設定為confirm模式
channel.confirmSelect();
// 未確認的訊息標識
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
// 通道新增監聽
channel.addConfirmListener(new ConfirmListener() {
// 沒有問題的handleAck
@Override
public void handleAck(long deliverTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("---handleNack----multiple");
confirmSet.headSet(deliverTag + 1).clear();
} else {
System.out.println("false");
confirmSet.remove(deliverTag);
}
}
@Override
public void handleNack(long deliverTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("---handleNack----multiple");
confirmSet.headSet(deliverTag + 1).clear();
} else {
System.out.println("false");
confirmSet.remove(deliverTag);
}
}
});
String msg = "hello confirm——ack! ";
while (true) {
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
confirmSet.add(seqNo);
}
}
}
1,2的順序,2,1也可以,推薦是2,1
消費者:
package com.wj.confirm.asynchronization;
import com.rabbitmq.client.*;
import com.wj.confirm.util.ConnectionUtil;
import java.io.IOException;
/**
* @建立人 wj
* @建立時間 2018/11/5
* @描述
*/
public class Recv {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("[接收端]訊息:"+new String(body,"utf-8"));
}
});
}
}
gitHub: