1. 程式人生 > >RabbitMQ從入門到精通(三)

RabbitMQ從入門到精通(三)

目錄

  • 1. 自定義消費者使用
    • 自定義消費端演示
  • 2.消費端的限流策略
    • 2.1 限流的場景與機制
    • 2.2 限流相關API
    • 2.3 限流演示
  • 3. 消費端ACK與重回佇列機制
    • 3.1 ACK與NACK
    • 3.2 重回佇列演示
  • 4. TTL
    • TTL演示
  • 5.死信佇列
    • 死信佇列演示

1. 自定義消費者使用

  • 我們之前呢都是在程式碼中編寫while迴圈,進行 consumer.nextDelivery 方法進行獲取下一條訊息,然後進行消費處理!
  • 其實我們還可以使用自定義的Consumer,它更加的方便,解耦性更加的強,也是在實際工作中最常用的使用方式!
  • 自定義消費端實現只需要繼承 DefaultConsumer 類,重寫 handleDelivery 方法即可

 

自定義消費端演示

public class Producer {
     public static void main(String[] args) throws Exception {
            //1 建立ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.244.11");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setHandshakeTimeout(20000);
            //2 獲取Connection
            Connection connection = connectionFactory.newConnection();
            //3 通過Connection建立一個新的Channel
            Channel channel = connection.createChannel();
            
            String exchange = "test_consumer_exchange";
            String routingKey = "consumer.save";
            
            String msg = "Hello RabbitMQ Consumer Message";
            //4 傳送訊息
            for(int i =0; i<5; i ++){
                channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
            }
        }
}

 

public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //consumerTag: 內部生成的消費標籤  properties: 訊息屬性  body: 訊息內容  
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        //envelope包含屬性:deliveryTag(標籤), redeliver, exchange, routingKey
        //redeliver是一個標記,如果設為true,表示訊息之前可能已經投遞過了,現在是重新投遞訊息到監聽佇列的消費者
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

 

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.#";
        String queueName = "test_consumer_queue";
        //4 宣告交換機和佇列,然後進行繫結設定路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //5 設定channel,使用自定義消費者
        channel.basicConsume(queueName, true, new MyConsumer(channel));
    }
}

 

執行說明

先啟動消費端,訪問管控臺:http://ip:15672,檢查Exchange和Queue是否設定OK,然後啟動生產端。消費端列印內容如下

 

2.消費端的限流策略

2.1 限流的場景與機制

  • 假設一個場景,我們Rabbitmq伺服器有上萬條未處理的訊息,我們隨便開啟一個消費者客戶端,會出現這種情況:巨量的訊息瞬間全部推送過來,但是我們單個客戶端無法同時處理這麼多資料!此時很有可能導致伺服器崩潰,嚴重的可能導致線上的故障。
  • 除了這種場景,還有一些其他的場景,比如說單個生產者一分鐘生產出了幾百條資料,但是單個消費者一分鐘可能只能處理60條資料,這個時候生產端和消費端肯定是不平衡的。通常生產端是沒辦法做限制的。所以消費端肯定需要做一些限流措施,否則如果超出最大負載,可能導致消費端效能下降,伺服器卡頓甚至崩潰等一系列嚴重後果。

 

消費端限流機制

RabbitMQ提供了一種qos (服務質量保證)功能,即在非自動確認訊息的前提下,如果一定數目的訊息 (通過基於consume或者channel設定Qos的值) 未被確認前,不進行消費新的訊息。

需要注意:

1.不能設定自動簽收功能(autoAck = false)

2.如果訊息沒被確認,就不會到達消費端,目的就是給消費端減壓

 

2.2 限流相關API

限流設定 - BasicQos()

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize: 單條訊息的大小限制,消費端通常設定為0,表示不做限制
prefetchCount: 一次最多能處理多少條訊息,通常設定為1
global: 是否將上面設定應用於channel,false代表consumer級別

注意事項

prefetchSizeglobal這兩項,rabbitmq沒有實現,暫且不研究
prefetchCountautoAck=false 的情況下生效,即在自動應答的情況下這個值是不生效的
 
手工ACK - basicAck()

void basicAck(Integer deliveryTag,boolean multiple)
手工ACK,呼叫這個方法就會主動回送給Broker一個應答,表示這條訊息我處理完了,你可以給我下一條了。引數multiple表示是否批量簽收,由於我們是一次處理一條訊息,所以設定為false

 

2.3 限流演示

生產端

生產端就是正常的邏輯

public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";

        String msg = "Hello RabbitMQ QOS Message";
        // 傳送訊息
        for (int i = 0; i < 5; i++) {
            channel.basicPublish(exchange, routingKey, true, null,
                    msg.getBytes());
        }
    }
}

 

自定義消費者

為了看到限流效果,這裡不進行ACK

public class MyConsumer extends DefaultConsumer {

    //接收channel
    private Channel channel ;
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        //System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
        //手工ACK,引數multiple表示不批量簽收
        //channel.basicAck(envelope.getDeliveryTag(), false);   
    }
}

 

消費端

關閉autoACK,進行限流設定

public class Consumer {

    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";
        //4 宣告交換機和佇列,然後進行繫結設定路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //進行引數設定:單條訊息的大小限制,一次最多能處理多少條訊息,是否將上面設定應用於channel
        channel.basicQos(0, 1, false);
        
        //限流: autoAck設定為 false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}

 

執行說明

我們先註釋掉手工ACK方法,然後啟動消費端和生產端,此時消費端只打印了一條訊息

這是因為我們設定了手工簽收,並且設定了一次只處理一條訊息,當我們沒有回送ack應答時,Broker端就認為消費端還沒有處理完這條訊息,基於這種限流機制就不會給消費端傳送新的訊息了,所以消費端只打印了一條訊息。

通過管控臺也可以看到佇列總共收到了5條訊息,有一條訊息沒有ack。

將手工簽收程式碼取消註釋,再次執行消費端,此時就會列印5條訊息的內容。

 

3. 消費端ACK與重回佇列機制

3.1 ACK與NACK

當我們設定 autoACK=false 時,就可以使用手工ACK方式了,那麼其實手工方式包括了手工ACK與NACK。

當我們手工 ACK 時,會發送給Broker一個應答,代表訊息成功處理了,Broker就可以回送響應給生產端了。NACK 則表示訊息處理失敗了,如果設定重回佇列,Broker端就會將沒有成功處理的訊息重新發送。

 

使用方式

  1. 消費端進行消費的時候,如果由於業務異常我們可以手工 NACK 並進行日誌的記錄,然後進行補償!
    方法:void basicNack(long deliveryTag, boolean multiple, boolean requeue)
  2. 如果由於伺服器宕機等嚴重問題,那我們就需要手工進行 ACK 保障消費端消費成功!
    方法:void basicAck(long deliveryTag, boolean multiple)

 

3.2 重回佇列演示

  • 消費端重回佇列是為了對沒有處理成功的訊息,把訊息重新會遞給Broker!
  • 重回佇列,會把消費失敗的訊息重新新增到佇列的尾端,供消費者繼續消費。
  • 一般我們在實際應用中,都會關閉重回佇列,也就是設定為false

 

生產端

對訊息設定自定義屬性以便進行區分

public class Producer {

    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactorys
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchange = "test_ack_exchange";
        String routingKey = "ack.save";
        
        for(int i =0; i<5; i ++){
            //設定訊息屬性
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num", i);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            //傳送訊息
            String msg = "Hello RabbitMQ ACK Message " + i;
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }   
    }
}

 

自定義消費

對第一條訊息進行NACK,並設定重回佇列

public class MyConsumer extends DefaultConsumer {

    private Channel channel ;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)properties.getHeaders().get("num") == 0) {
            //NACK,引數三requeue:是否重回佇列
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
}

 

消費端

關閉自動簽收功能

public class Consumer {
    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.#";
        //宣告交換機和佇列,然後進行繫結設定路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //手工簽收 必須要設定 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}

 

執行說明

先啟動消費端,然後啟動生產端,消費端列印如下,顯然第一條訊息由於我們呼叫了NACK,並且設定了重回佇列,所以會導致該條訊息一直重複傳送,消費端就會一直迴圈消費。

 

一般工作中不會設定重回佇列這個屬性,都是自己去做補償或者投遞到延遲佇列裡的,然後指定時間去處理即可。

 

4. TTL

TTL說明

  • TTL是Time To Live的縮寫,也就是生存時間
  • RabbitMQ支援訊息的過期時間,在訊息傳送時可以進行指定
  • RabbitMQ支援為每個佇列設定訊息的超時時間,從訊息入佇列開始計算,只要超過了佇列的超時時間配置,那麼訊息會自動的清除

 

TTL演示

這次演示我們不寫程式碼,只通過管控臺進行操作,實際測試也會更為方便一些。
 

1. 建立Exchange

選擇Exchange選單,找到下面的Add a new exchange

 

2.建立Queue

選擇Queue選單,找到下面的Add a new queue

 

3.建立佇列和交換機的繫結關係

點選Exchange表格中的test002_exchange,在下面新增繫結規則

 

4.傳送訊息

點選Exchange表格中的test002_exchange,在下面找到Publish message,設定訊息進行傳送

 

5.驗證

點選Queue選單,查看錶格中test002已經有了一條訊息,10秒後表格顯示0條,說明過期時間到了訊息被自動清除了。

 

6.設定單條訊息過期時間

點選Exchange表格中的test002_exchange,在下面找到Publish message,設定訊息的過期時間並進行傳送,此時觀察test002佇列,發現訊息5s後就過期被清除了,即使佇列設定的過期時間是10s。

 
TTL程式碼設定過期時間

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration("10000") //10s過期
                .build();
        //傳送訊息
        channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());

 

佇列過期時間設定

        //設定佇列的過期時間10s
        Map<String,Object> param = new HashMap<>();
        param.put("x-message-ttl", 10000);
        //宣告佇列
        channel.queueDeclare(queueName, true, false, false, null);

 
注意事項

  1. 兩者的區別是設定佇列的過期時間是對該佇列的所有訊息生效的。
  2. 為訊息設定TTL有一個問題:RabbitMQ只對處於隊頭的訊息判斷是否過期(即不會掃描佇列),所以,很可能佇列中已存在死訊息,但是佇列並不知情。這會影響佇列統計資料的正確性,妨礙佇列及時釋放資源。

 

5.死信佇列

死信佇列介紹

  • 死信佇列:DLX,dead-letter-exchange
  • 利用DLX,當訊息在一個佇列中變成死信 (dead message) 之後,它能被重新publish到另一個Exchange,這個Exchange就是DLX

 

訊息變成死信有以下幾種情況

  • 訊息被拒絕(basic.reject / basic.nack),並且requeue = false
  • 訊息TTL過期
  • 佇列達到最大長度

 

死信處理過程

  • DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。
  • 當這個佇列中有死信時,RabbitMQ就會自動的將這個訊息重新發布到設定的Exchange上去,進而被路由到另一個佇列。
  • 可以監聽這個佇列中的訊息做相應的處理。

 

死信佇列設定

  1. 首先需要設定死信佇列的exchange和queue,然後進行繫結:

  1. 然後需要有一個監聽,去監聽這個佇列進行處理
  2. 然後我們進行正常宣告交換機、佇列、繫結,只不過我們需要在佇列加上一個引數即可:arguments.put(" x-dead-letter-exchange","dlx.exchange");,這樣訊息在過期、requeue、 佇列在達到最大長度時,訊息就可以直接路由到死信佇列!

 

死信佇列演示

生產端

public class Producer {
    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchange = "test_dlx_exchange";
        String routingKey = "dlx.save";
        
        String msg = "Hello RabbitMQ DLX Message";
        
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("10000")
                .build();
        //傳送訊息
        channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
    }
}

 

自定義消費者

public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

 

消費端

  • 宣告正常處理訊息的交換機、佇列及繫結規則
  • 在正常交換機上指定死信傳送的Exchange
  • 宣告死信交換機、佇列及繫結規則
  • 監聽死信佇列,進行後續處理,這裡省略
public class Consumer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 宣告一個普通的交換機 和 佇列 以及路由
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        String queueName = "test_dlx_queue";
        String deadQueueName = "dlx.queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        // 指定死信傳送的Exchange
        Map<String, Object> agruments = new HashMap<String, Object>();
        agruments.put("x-dead-letter-exchange", "dlx.exchange");
        // 這個agruments屬性,要設定到宣告佇列上
        channel.queueDeclare(queueName, true, false, false, agruments);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 要進行死信佇列的宣告
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare(deadQueueName, true, false, false, null);
        channel.queueBind(deadQueueName, "dlx.exchange", "#");

        channel.basicConsume(queueName, true, new MyConsumer(channel));
        //channel.basicConsume(deadQueueName, true, new MyConsumer(channel));

    }
}

 

執行說明

啟動消費端,此時檢視管控臺,新增了兩個Exchange,兩個Queue。在test_dlx_queue上我們設定了DLX,也就代表死信訊息會發送到指定的Exchange上,最終其實會路由到dlx.queue上。

此時關閉消費端,然後啟動生產端,檢視管控臺佇列的訊息情況,test_dlx_queue的值為1,而dlx_queue的值為0。
10s後的佇列結果如圖,由於生產端傳送訊息時指定了訊息的過期時間為10s,而此時沒有消費端進行消費,訊息便被路由到死信佇列中。

實際環境我們還需要對死信佇列進行一個監聽和處理,當然具體的處理邏輯和業務相關,這裡只是簡單演示死信佇列是否生