1. 程式人生 > >RabbitMQ系列—Java操作之事務模式和Confirm模式

RabbitMQ系列—Java操作之事務模式和Confirm模式

在之前介紹到了RabbitMQ的訊息持久化和消費者端手動確認,解決了消費者異常導致的資料丟失問題,那麼我們如何確定生產者生產的訊息已經被髮送到rabbitmq伺服器了呢?通俗點說,如果訊息經過交換器進入佇列就可以完成訊息的持久化,但如果訊息在沒有到達broker之前出現意外,那就造成訊息丟失,有沒有辦法可以解決這個問題?有兩種方式:

  • 通過AMQP協議,AMQP協議實現了事務機制。
  • 通過Confirm模式

AMQP的事務機制

事務的實現主要是對通道(Channel)的設定,主要的方法有三個:

  • channel.txSelect():宣告啟動事務模式
  • channel.txComment():提交事務
  • channel.txRollback():回滾事務

生產者Sender

public class Sender {
	private static final String QUEUE = "test_tx_queue";

	public static void main(String[] args) {
		Connection con = null;
		Channel channel = null;
		try {
			// 獲取連線
			con = ConnectionUtils.getConnection();
			// 從連線中建立通道
			channel = con.createChannel();
			// 宣告一個佇列
			channel.queueDeclare(QUEUE, false, false, false, null);
			// 訊息內容
			String msg = "tx queue hello!";
			// 開啟事務
			channel.txSelect();
			// 傳送訊息
			channel.basicPublish("", QUEUE, null, msg.getBytes());
			// 模擬異常
			int num = 1/0;
			// 提交事務
			channel.txCommit();
			System.out.println("send success");
		} catch (Exception e) {
			// 事務回滾
			try {
				channel.txRollback();
			} catch (IOException e1) {
				e1.printStackTrace();
			}
			e.printStackTrace();
		} finally {
			// 關閉連線
			ConnectionUtils.close(channel, con);
		}

	}

}

消費者Recver

public class Recver {
	private static final String QUEUE = "test_tx_queue";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 獲取連線
		Connection con = ConnectionUtils.getConnection();
		// 從連線中建立通道
		Channel channel = con.createChannel();
		// 宣告佇列
		channel.queueDeclare(QUEUE, false, false, false, null);
		// 建立消費者
		Consumer consumer = new DefaultConsumer(channel) {
			// 獲取訊息
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body, "utf-8");
				System.out.println("接收到訊息——" + msg);
			}

		};
		// 監聽佇列
		channel.basicConsume(QUEUE, true, consumer);
	}
}

執行消費者和生產者,生產者報錯而且消費者也沒有收到訊息,說明訊息已經被回滾了。

消費者模式使用事務

我們知道,消費者可以使用訊息自動或手動傳送來確認消費訊息,那如果我們在消費者模式中使用事務(當然如果使用了手動確認訊息,完全用不到事務的),會發生什麼呢?

結果分為兩種情況:

  • autoAck=false手動應對的時候是支援事務的,也就是說即使你已經手動確認了訊息已經收到了,但在確認訊息會等事務的返回解決之後,在做決定是確認訊息還是重新放回佇列,如果你手動確認現在之後,又回滾了事務,那麼已事務回滾為主,此條訊息會重新放回佇列。
  • autoAck=true如果自定確認為true的情況是不支援事務的,也就是說你即使在收到訊息之後在回滾事務也是於事無補的,佇列已經把訊息移除了。

這種事務模式有個缺陷:效能差,降低了rabbitmq的訊息吞吐量,使用了事務模式比非事務模式效能差很多,那麼有沒有既能保證訊息的可靠性又能兼顧效能的解決方案呢?那就是下面的Confirm模式。

Confirm模式

Confirm傳送方確認模式使用和事務類似,也是通過設定Channel進行傳送方確認的。

實現原理

將Channel設定為Confirm模式後,此Channel傳送的每條訊息都會有標識這條訊息的ID(從1開始),當r訊息投放到匹配的佇列後,broker會返回一個確認資訊(包含訊息的唯一ID)給生產者通知生產者已經成功傳送到佇列。如果訊息和佇列是可持久化的,在佇列將訊息寫人到磁碟後再返回給生產者確認資訊。broker回傳給生產者的確認訊息中deliver-tag域包含了確認訊息的序列號,此外broker也可以設定basic.ack的multiple域,表示這個序列號之前的所有訊息都已經得到了處理。

三種程式設計方式

  • 序列confirm模式:peoducer每傳送一條訊息後,呼叫waitForConfirms()方法,等待broker端confirm。
  • 批量confirm模式:producer每傳送一批訊息後,呼叫waitForConfirms()方法,等待broker端confirm。
  • 非同步confirm模式:提供一個回撥方法,broker confirm了一條或者多條訊息後producer端會回撥這個方法。

Confirm模式最大的優點就是它是非同步的。

序列confirm模式

生產者SingleSender

public class SingleSender {
	private static final String QUEUE = "test_confirm_queue";

	public static void main(String[] args) {
		Connection con = null;
		Channel channel = null;
		try {
			// 獲取連線
			con = ConnectionUtils.getConnection();
			// 從連線中建立通道
			channel = con.createChannel();
			// 宣告一個佇列
			channel.queueDeclare(QUEUE, false, false, false, null);
			// 訊息內容
			String msg = "confirm queue hello!";
			// 將Channel設定為Confirm模式
			channel.confirmSelect();
			// 傳送訊息
			channel.basicPublish("", QUEUE, null, msg.getBytes());
			// 訊息確認
			if(channel.waitForConfirms()){
				System.out.println("send success");
			}else{
				System.out.println("send fail");
			}
			
		} catch (IOException e) {
			e.printStackTrace();
		} catch (TimeoutException e) {
			e.printStackTrace();
		}catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			// 關閉連線
			ConnectionUtils.close(channel, con);
		}

	}
}

消費者Recver

public class Recver{
	private static final String QUEUE = "test_confirm_queue";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 獲取連線
		Connection con = ConnectionUtils.getConnection();
		// 從連線中建立通道
		Channel channel = con.createChannel();
		// 宣告佇列
		channel.queueDeclare(QUEUE, false, false, false, null);
		// 建立消費者
		Consumer consumer = new DefaultConsumer(channel) {
			// 獲取訊息
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body, "utf-8");
				System.out.println("接收到訊息——" + msg);
			}

		};
		// 監聽佇列
		channel.basicConsume(QUEUE, true, consumer);
	}
}

普通模式需要一條一條確認,效能慢,可以選擇批量模式。

批量confirm模式

生產者BatchSender

public class BatchSender {
	private static final String QUEUE = "test_confirm_queue";

	public static void main(String[] args) {
		Connection con = null;
		Channel channel = null;
		try {
			// 獲取連線
			con = ConnectionUtils.getConnection();
			// 從連線中建立通道
			channel = con.createChannel();
			// 宣告一個佇列
			channel.queueDeclare(QUEUE, false, false, false, null);
			// 將Channel設定為Confirm模式
			channel.confirmSelect();
			for (int i = 0; i < 20; i++) {
				// 訊息內容
				String msg = "confirm queue hello!";
				// 傳送訊息
				channel.basicPublish("", QUEUE, null, msg.getBytes());
			}
			// 訊息確認
			if (channel.waitForConfirms()) {
				System.out.println("send success");
			} else {
				System.out.println("send fail");
			}

		} catch (IOException e) {
			e.printStackTrace();
		} catch (TimeoutException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			// 關閉連線
			ConnectionUtils.close(channel, con);
		}

	}
}

通過迴圈批量傳送20條訊息,但只在控制檯輸出了一行“發send success”,該方法會等到最後一條訊息得到ack或者得到nack才會結束,也就是說在waitForConfirms處會造成當前程式的阻塞,這點我們看出broker端預設情況下是進行批量回復的,並不是針對每條訊息都發送一條ack訊息。

缺陷

一批資料中有一條訊息傳送失敗會都回滾。

非同步模式

普通模式和批量模式都是序列的、同步執行的,如果訊息傳送出去沒有返回確認訊息會一直等待,而非同步模式執行效率高,不需要等待訊息執行完,只需要監聽訊息即可。

生產者AsyncSender

public class AsyncSender {
	private static final String QUEUE = "test_confirm_queue";

	public static void main(String[] args) {
		Connection con = null;
		Channel channel = null;
		try {
			// 獲取連線
			con = ConnectionUtils.getConnection();
			// 從連線中建立通道
			channel = con.createChannel();
			// 宣告一個佇列
			channel.queueDeclare(QUEUE, false, false, false, null);
			// 將Channel設定為Confirm模式
			channel.confirmSelect();
			// 非同步監聽確認和未確認的訊息
			channel.addConfirmListener(new ConfirmListener() {
				/**
				 * 處理返回確認成功
				 * 
				 * @param deliveryTag
				 *            如果是多條,這個就是最後一條訊息的tag
				 * @param multiple
				 *            是否多條 true是false否
				 * @throws IOException
				 */

				public void handleAck(long deliveryTag, boolean multiple) throws IOException {
					System.out.println("ack:deliveryTag:" + deliveryTag + ",multiple:" + multiple);
				}

				/**
				 * 處理返回確認失敗
				 * 
				 * @param deliveryTag
				 * @param multiple
				 * @throws IOException
				 */
				public void handleNack(long deliveryTag, boolean multiple) throws IOException {
					System.out.println("nack:deliveryTag:" + deliveryTag + ",multiple:" + multiple);
				}
			});
			for (int i = 0; i < 50; i++) {
				// 訊息內容
				String msg = "confirm queue hello!" + i;
				// long tag = channel.getNextPublishSeqNo();
				// 傳送訊息
				channel.basicPublish("", QUEUE, null, msg.getBytes());
				// System.out.println("訊息tag" + tag);
			}

			System.out.println("執行結束");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 關閉連線
			ConnectionUtils.close(channel, con);
		}

	}

}

執行後控制檯列印

ack:deliveryTag:2,multiple:true
ack:deliveryTag:13,multiple:true
ack:deliveryTag:14,multiple:false
ack:deliveryTag:15,multiple:false
ack:deliveryTag:16,multiple:false
ack:deliveryTag:18,multiple:true
ack:deliveryTag:20,multiple:true
ack:deliveryTag:22,multiple:true
ack:deliveryTag:23,multiple:false
ack:deliveryTag:24,multiple:false
ack:deliveryTag:25,multiple:false
ack:deliveryTag:26,multiple:false
ack:deliveryTag:27,multiple:false
ack:deliveryTag:28,multiple:false
執行結束
ack:deliveryTag:29,multiple:false
ack:deliveryTag:34,multiple:true

可以看到,傳送50條訊息,收到的ack個數不一樣多次執行程式會發現每次傳送回來的ack訊息中的deliveryTag域的值並不是一樣的,說明broker端批量回傳給傳送者的ack訊息並不是以固定的批量大小回傳的。

效能比較

事務模式效能是最差的,普通confirm模式效能比事務模式稍微好點,但是和批量confirm模式還有非同步confirm模式相比,還是小巫見大巫。批量confirm模式的問題在於confirm之後返回false之後進行重發這樣會使效能降低,非同步confirm模式(async)程式設計模型較為複雜,至於採用哪種方式具體看實際情況。
注意:AMQP的事務模式和Confirm模式不能一起使用。