1. 程式人生 > >RabbitMQ 訊息持久化、事務、Publisher的訊息確認機制

RabbitMQ 訊息持久化、事務、Publisher的訊息確認機制


RabbitMQ  訊息持久化、事務、Publisher的訊息確認機制

1. 宣告MessageQueue
在RabbitMQ中,無論是生產者傳送訊息還是消費者接受訊息,都首先需要宣告一個MessageQueue。
這就存在一個問題,是生產者宣告還是消費者宣告呢?要解決這個問題,首先需要明確:
a)消費者是無法訂閱或者獲取不存在的MessageQueue中的資訊。
b)訊息被Exchange接受以後,如果沒有匹配的Queue,則會被丟棄。

在明白了上述兩點以後,就容易理解:
如果是消費者去宣告Queue,就有可能會出現在宣告Queue之前,生產者已傳送的訊息被丟棄的隱患;
如果應用能夠通過訊息重發的機制允許訊息丟失,則使用此方案沒有任何問題。
但是如果不能接受該方案,這就需要無論是生產者還是消費者,在傳送或者接受訊息前,都需要去嘗試建立訊息佇列。
這裡有一點需要明確,如果客戶端嘗試建立一個已經存在的訊息佇列,Rabbit MQ不會做任何事情,並返回客戶端建立成功的。

如果一個消費者在一個通道中正在監聽某一個佇列的訊息,Rabbit MQ是不允許該消費者在同一個channel去宣告其他佇列的。

Rabbit MQ中,可以通過queue.declare命令宣告一個佇列,可以設定該佇列以下屬性:
a)Exclusive:排他佇列,如果一個佇列被宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除。
  這裡需要注意三點:其一,排他佇列是基於連線可見的,同一連線的不同通道是可以同時訪問同一個連線建立的排他佇列的。
  其二,“首次”,如果一個連線已經聲明瞭一個排他佇列,其他連線是不允許建立同名的排他佇列的,這個與普通佇列不同。
  其三,即使該佇列是持久化的,一旦連線關閉或者客戶端退出,該排他佇列都會被自動刪除的。
  這種佇列適用於只限於一個客戶端傳送讀取訊息的應用場景。
b)Auto-delete:自動刪除,如果該佇列沒有任何訂閱的消費者的話,該佇列會被自動刪除。這種佇列適用於臨時佇列。
c)Durable:持久化,這個會在後面作為專門一個章節討論。
d)其他選項,例如如果使用者僅僅想查詢某一個佇列是否已存在,如果不存在,不想建立該佇列,
仍然可以呼叫queue.declare,只不過需要將引數passive設為true,傳給queue.declare,
如果該佇列已存在,則會返回true;如果不存在,則會返回Error,但是不會建立新的佇列。

2. 生產者傳送訊息
3. 消費者訂閱訊息    

4. 持久化

   RabbitMQ預設是不持久佇列、Exchange、Binding以及佇列中的訊息的,這意味著一旦訊息伺服器重啟,
   所有已宣告的佇列,Exchange,Binding以及佇列中的訊息都會丟失。
   通過設定Exchange和MessageQueue的durable屬性為true,可以使得佇列和Exchange持久化,
   但是這還不能使得佇列中的訊息持久化,還需要生產者在傳送訊息的時候,
   將basicPublish()方法的BasicProperties props引數中deliveryMode設定為2【使用basicPublish(MessageProperties.PERSISTENT_BASIC)】,
   只有這3個全部設定完成後,才能保證伺服器重啟不會對現有的佇列造成影響。
   這裡需要注意的是,只有durable為true的Exchange和durable為ture的Queues才能繫結,
   否則在繫結時,RabbitMQ都會拋錯的。持久化會對RabbitMQ的效能造成比較大的影響,可能會下降10倍不止。

5. 事務

   對事務的支援是AMQP協議的一個重要特性。假設當生產者將一個持久化訊息傳送給伺服器時,
   因為consume命令本身沒有任何Response返回,所以即使伺服器崩潰,沒有持久化該訊息,生產者也無法獲知該訊息已經丟失。
   如果此時使用事務,即通過txSelect()開啟一個事務,然後傳送訊息給伺服器,然後通過txCommit()提交該事務,即可以保證,
   如果txCommit()提交了,則該訊息一定會持久化,如果txCommit()還未提交即伺服器崩潰,則該訊息不會伺服器就收。
   當然RabbitMQ也提供了txRollback()命令用於回滾某一個事務。

6. Confirm機制(Publisher Acknowledgements)

   使用事務固然可以保證只有提交的事務,才會被伺服器執行。但是這樣同時也將客戶端與訊息伺服器同步起來,
   這背離了訊息佇列解耦的本質。RabbitMQ提供了一個更加輕量級的機制來保證生產者可以感知伺服器訊息是否
   已被路由到正確的佇列中——Confirm。如果設定channel為confirm狀態,則通過該channel傳送的訊息都會被分配一個唯一的ID,
   然後一旦該訊息被正確的路由到匹配的佇列中後,伺服器會返回給生產者一個Confirm,該Confirm包含該訊息的ID,
   這樣生產者就會知道該訊息已被正確分發。對於持久化訊息,只有該訊息被持久化後,才會返回Confirm。
   Confirm機制的最大優點在於非同步,生產者在傳送訊息以後,即可繼續執行其他任務。而伺服器返回Confirm後,
   會觸發生產者的回撥函式,生產者在回撥函式中處理Confirm資訊。如果訊息伺服器發生異常,導致該訊息丟失,
   會返回給生產者一個nack,表示訊息已經丟失,這樣生產者就可以通過重發訊息,保證訊息不丟失。
   Confirm機制在效能上要比事務優越很多。但是Confirm機制,無法進行回滾,就是一旦伺服器崩潰,生產者無法得到Confirm資訊,
   生產者其實本身也不知道該訊息吃否已經被持久化,只有繼續重發來保證訊息不丟失,但是如果原先已經持久化的訊息,
   並不會被回滾,這樣佇列中就會存在兩條相同的訊息,系統需要支援去重。

Java客戶端Confirm Mode
import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;

public class ConfirmDontLoseMessages {
    private final static int msgCount = 10000; // 預設傳送10000條訊息
    private final static String QUEUE_NAME = "confirm-test2"; // 佇列名稱
    private static ConnectionFactory connectionFactory;

    public static void main(String[] args)
            throws IOException, InterruptedException {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.59.79.37");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setPort(5672);

        Thread consumerThread = new Thread(new Consumer());
        Thread publisherThread = new Thread(new Publisher());
        // 開啟消費者執行緒

        consumerThread.start();
        // 開啟生產者執行緒
        publisherThread.start();
    }

    // 訊息釋出者
    static class Publisher implements Runnable {
        public void run() {
            try {
                long startTime = System.currentTimeMillis();
                // 連線並非執行緒安全的,所以要每執行緒一個連線
                Connection conn = connectionFactory.newConnection();
                Channel ch = conn.createChannel();
                // 建立一個持久化的,非獨享,不自動刪除的佇列
                ch.queueDeclare(QUEUE_NAME, true, false, false, null);
                // 開啟通道上的 publisher acknowledgements
                ch.confirmSelect();

                // 傳送持久化訊息,訊息內容為helloWorld
                for (long i = 0; i < msgCount; ++i) {
                    ch.basicPublish("", QUEUE_NAME,
                            MessageProperties.PERSISTENT_BASIC,
                            "helloWorld".getBytes());
                }

                // 等待所有訊息都被ack或者nack,如果某個訊息被nack,則丟擲IOException
                ch.waitForConfirmsOrDie();
                long endTime = System.currentTimeMillis();
                System.out.printf("Test took %.3fs\n", (float) (endTime - startTime) / 1000);
                        
                // 刪除佇列,不論是否在使用中
                ch.queueDelete(QUEUE_NAME);
                ch.close();
                conn.close();
            } catch (Throwable e) {
                System.out.println("damn fuck! error detected :(");
                System.out.print(e);
            }
        }
    }

    // 訊息消費者
    static class Consumer implements Runnable {
        public void run() {
            try {
                // 每執行緒一個連線
                Connection conn = connectionFactory.newConnection();
                Channel ch = conn.createChannel();
                ch.queueDeclare(QUEUE_NAME, true, false, false, null);

                // 建立訊息消費者
                QueueingConsumer qc = new QueueingConsumer(ch);
                ch.basicConsume(QUEUE_NAME, true, qc);
                for (int i = 0; i < msgCount; ++i) {
                    qc.nextDelivery();
                }
                // 關閉通道和連線
                System.out.println("consumer done");
                ch.close();
                conn.close();
            } catch (Throwable e) {
                System.out.println("damn fuck! some error happened!");
                System.out.print(e);
            }
        }
    }

}

/** Wait until all messages published since the last call have
  * been either ack'd or nack'd by the broker.  If any of the
  * messages were nack'd, waitForConfirmsOrDie will throw an
  * IOException.  When called on a non-Confirm channel, it will
  * throw an IllegalStateException.
  * @throws java.lang.IllegalStateException
  */
void waitForConfirmsOrDie() throws IOException, InterruptedException;

原文:
http://www.360doc.com/content/14/0608/22/834950_384932402.shtml   RabbitMQ持久化、事務、訊息確認
http://www.tuicool.com/articles/bMJ7r2   RabbitMQ  Publisher的訊息確認機制