1. 程式人生 > >RabbitMQ之訊息確認機制AMQP事務

RabbitMQ之訊息確認機制AMQP事務

概述

我們在RabbitMQ中可以通過持久化來解決伺服器掛掉而丟失資料問題,但是大家有沒有想過,我的訊息到達了RabbitMQ伺服器了嗎??? 我們是不知道的,導致的問題就是 如果訊息在到達伺服器之前就丟失了,持久化也是不能解決問題的!
那怎麼辦?
我們有兩種方式:
1.通過AMQP協議的事務機制來實現訊息的確認
2.confirm模式;

事務機制

RabbitMQ中提供了3個方法:txSelect(), txCommit()以及txRollback(),其類似於jdbc中的事務開啟 提交 回滾;
txSelect用於將當前channel設定成transaction模式,
txCommit用於提交事務,
txRollback用於回滾事務,

在通過txSelect開啟事務之後,我們便可以釋出訊息給RabbitMQ伺服器了,如果txCommit提交成功了,則訊息一定到達了broker了,如果在txCommit執行之前broker異常崩潰或者由於其他原因丟擲異常,這個時候我們便可以捕獲異常通過txRollback回滾事務了。

程式碼:

channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.txCommit();

下面來舉個例子,因為訊息確認機制是對於生產者 ,我們這裡只討論生產者的程式碼

生產者

public class SendMQ {
    private static final String QUEUE_NAME  = "QUEUE_simple";

    @Test
    public void sendMsg() throws IOException, TimeoutException {
        /* 獲取一個連線 */
        Connection connection = ConnectionUtils.getConnection();

        /* 從連線中建立通道 */
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false
, false, false, null); String msg = "Hello Simple QUEUE !"; try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); int result = 1 / 0; channel.txCommit(); } catch (Exception e) { channel.txRollback(); System.out.println("----msg rollabck "); }finally{ System.out.println("---------send msg over:" + msg); } channel.close(); connection.close(); } }

消費者

public class Consumer {
    private static final String QUEUE_NAME  = "QUEUE_simple";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //獲取到達的訊息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        //監聽佇列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

通過測試:此種模式還是很耗時的,因為內部走了多次通訊,所以採用這種方式 降低了Rabbitmq的訊息吞吐量