九、rabbitMQ的訊息確認機制之事務機制
阿新 • • 發佈:2019-02-13
說明:在rabbitMQ中,我們為了解決伺服器異常導致資料丟失的問題,我們可以採用rabbitMQ的持久化機制,但是我們如何確定生產者將訊息傳送給了rabbitMQ呢,那麼我們採用兩種協議的模式。
(1)、AMQP實現了事務機制
(2)、confirm模式
一、事務機制
txSelect:使用者將當前channel設定為transation模式
txCommit:用於提交事務
txRollback:用於回滾事務
二、例項
(1)生產者:
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.demo.rabbitMQ.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class TransationProduce { private static final String QUEUE_NAME="test_queue_transation"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection=null; Channel channel=null; try { //獲取連線 connection = ConnectionUtils.getConnection(); //建立通道 channel = connection.createChannel(); //宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg="hello tx"; //開啟事務模式 channel.txSelect(); //釋出訊息 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); //提交事務 channel.txCommit(); } catch (Exception e) { channel.txRollback(); System.out.println("send message rollback"); }finally { channel.close(); connection.close(); } } } |
(2) 消費者
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.demo.rabbitMQ.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class Consumer { private static final String QUEUE_NAME="test_queue_transation"; public static void main(String[] args) throws IOException, TimeoutException { //獲取連線 Connection connection = ConnectionUtils.getConnection(); //獲取通道 Channel channel = connection.createChannel(); //訊息持久化 boolean durable=false; //宣告佇列 channel.queueDeclare(QUEUE_NAME, durable, false, false, null); //保證一次只分發一個 channel.basicQos(1); //定義一個消費者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { //一旦有訊息就會觸發該方法 @Override public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties basicProperties,byte[] body) throws IOException { String msg=new String(body,"utf-8"); System.out.println("===========:"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("=================consumer1 over================"); //手動應答 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //關閉自動應答 boolean autoAck=false; //監聽佇列 channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer); } } |
注意:這種模式走的通訊太多,大量請求發往rabbitMQ,降低rabbitMQ的吞吐量。
原始碼地址:https://github.com/Carlutf8/rabbitMQ