1. 程式人生 > >RabbitMQ訊息釋出時的權衡

RabbitMQ訊息釋出時的權衡

在進行本篇文章的學習之前,你需要先閱讀https://www.cnblogs.com/duanjt/p/10057330.html。以便對Java訪問RabbitMQ的基礎用法有所瞭解。

一、失敗通知

基於前面的講解,如果訊息通過交換器傳送到指定的路由鍵,而這個路由鍵卻沒有被佇列繫結,那麼這條訊息就會被丟棄。從這個角度來說訊息的可靠性就比較低。為了增強可靠性,於是引入了失敗通知的機制。在生產者傳送訊息到RabbitMQ的時候,如果路由鍵沒有被佇列繫結就將回調一個函式,讓消費者能夠知曉資料傳送失敗,從而做一些處理(注意:傳送成功不會回撥)。
具體程式碼如下:

 

public class Productor {
    
public static String EXCHANGE_NAME = "zd_exchange"; public static String ROUTEKEY = "zd_data_list1"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("172.23.88.116"); factory.setPort(
5672); factory.setUsername("zhangxueliang"); factory.setPassword("zhangxueliang"); factory.setVirtualHost("/"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); // 建立交換器 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 建立失敗通知 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("ERROR DATA:" + new String(body)); System.out.println("replyText:" + replyText); System.out.println("replyCode:" + replyCode); System.out.println("================"); } }); // 傳送資料 for (int i = 0; i < 3; i++) { String msg = "Hello world " + i; channel.basicPublish(EXCHANGE_NAME, ROUTEKEY, true, null, msg.getBytes()); System.out.println("send:" + msg); } Thread.sleep(2000);//暫停2秒,以便接收回調。因為通道關閉後就不能接收到回撥訊息了 channel.close(); conn.close(); } }

 

注意:

1.我們需要在傳送訊息的時候指定mandatory=true(channel.basicPublish的第三個引數)
2.我們需要通過channel增加一個返回監聽,replyText表示錯誤原因,body表示傳送的訊息內容。

 

二、事務


1.channel.txSelect()宣告啟動事務模式;
2.channel.txComment()提交事務;
3.channel.txRollback()回滾事務;
說明:由於事務會嚴重影響效率,所以我們一般不使用,而是用傳送方確認模式來保證資料的可靠性。

 

三、傳送方確認模式


傳送方確認模式和事務還是有一定的區別
比如傳送10條資料--
事務能保證在第5條傳送錯誤的時候就回滾,RabbitMQ裡面沒有一條資料。
而傳送者確認模式在第5條傳送錯誤的時候會丟擲一個異常,生產者就能抓住這個異常從而得知沒有全部成功,但是已經發送的資料卻已經存在於RabbitMQ了。
傳送方確認有3種方式。2種同步,1種非同步
方式一:channel.waitForConfirms()普通傳送方確認模式;訊息到達交換器,就會返回true。
方式二:channel.waitForConfirmsOrDie()批量確認模式;使用同步方式等所有的訊息傳送之後才會執行後面程式碼,只要有一個訊息未到達交換器就會丟擲IOException異常。
方式三:channel.addConfirmListener()非同步監聽傳送方確認模式
【注意】必須要先啟動傳送者確認模式channel.confirmSelect();
具體程式碼:

public class Productor {
    public static String EXCHANGE_NAME = "zd_exchange";
    public static String ROUTEKEY = "zd_data_list";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.23.88.116");
        factory.setPort(5672);
        factory.setUsername("zhangxueliang");
        factory.setPassword("zhangxueliang");
        factory.setVirtualHost("/");

        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();

        // 建立交換器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 建立失敗通知
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("handleReturn==>" + replyText);
            }
        });

        // 非同步確認
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("handleNack==>" + deliveryTag + "----" + multiple);
            }

            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("handleAck==>" + deliveryTag + "----" + multiple);
            }
        });

        // 啟用傳送確認模式
        channel.confirmSelect();

        // 傳送資料
        String[] routekeys = new String[] { "zd_data_list", "zd_data_list2", "zd_data_list3" };
        for (int i = 0; i < 3; i++) {
            String msg = "Hello world " + i;
            channel.basicPublish(EXCHANGE_NAME, routekeys[i % 3], true, null, msg.getBytes());
            System.out.println("send:" + msg);

            // 同步單條確認
            //channel.waitForConfirms();
        }
        // 同步批量確認
        // channel.waitForConfirmsOrDie();

        Thread.sleep(2000);// 暫停2秒,以便接收回調。因為通道關閉後就不能接收到回撥訊息了

        channel.close();
        conn.close();
    }
}

 

注意:

1.傳送者確認模式只是告訴生產者是否傳送成功,至於成功和失敗後的具體處理還是程式碼自己實現。
2.同步單條確認channel.waitForConfirms()會返回是否成功。同步批量確認channel.waitForConfirmsOrDie()可通過異常來判斷是否成功。
3.非同步確認channel.addConfirmListener()。有兩個引數(long deliveryTag, boolean multiple)。第一個引數表示訊息的Id,第二個引數表示是否為批量。如果第二個引數為true,那麼編號<=deliveryTag的所有訊息都確認了。
4.傳送者確認模式和失敗通知可以一起使用。無論訊息的路由鍵是否有佇列繫結,都會返回Ack表示成功。而如果沒有佇列繫結路由鍵,同時又啟用了失敗通知,那麼還會在呼叫channel.addConfirmListener()之前去呼叫channel.addReturnListener()

 

四、備用交換器


備用交換器從本質上說也就是一個普通交換器。當訊息傳送到普通交換器而沒有佇列繫結路由鍵的時候。該訊息就會發送到備用交換器上面。如果備用交換器上面也沒得佇列繫結,那麼這條訊息也將被丟棄(如果配置了失敗通知,那麼將回調失敗通知方法channel.addReturnListener())
核心程式碼:

// 建立主交換器
Map<String, Object> argsMap = new HashMap<String, Object>();
argsMap.put("alternate-exchange", BACK_EXCHANGE_NAME);// 定義主交換器的備用交換器名稱
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, argsMap);
// 建立備用交換器
channel.exchangeDeclare(BACK_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);