1. 程式人生 > >rabbitmq 實現延遲佇列的兩種方式

rabbitmq 實現延遲佇列的兩種方式

轉載請註明出處

ps: 文章裡面延遲佇列=延時佇列

什麼是延遲佇列

延遲佇列儲存的物件肯定是對應的延時訊息,所謂”延時訊息”是指當訊息被髮送以後,並不想讓消費者立即拿到訊息,而是等待指定時間後,消費者才拿到這個訊息進行消費。

場景一:在訂單系統中,一個使用者下單之後通常有30分鐘的時間進行支付,如果30分鐘之內沒有支付成功,那麼這個訂單將進行一場處理。這是就可以使用延時佇列將訂單資訊傳送到延時佇列。

場景二:使用者希望通過手機遠端遙控家裡的智慧裝置在指定的時間進行工作。這時候就可以將使用者指令傳送到延時佇列,當指令設定的時間到了再將指令推送到只能裝置。

RabbitMQ如何實現遲佇列

方法一

AMQP協議和RabbitMQ佇列本身沒有直接支援延遲佇列功能,但是可以通過以下特性模擬出延遲佇列的功能。
但是我們可以通過RabbitMQ的兩個特性來曲線實現延遲佇列:

RabbitMQ可以針對Queue設定x-expires 或者 針對Message設定 x-message-ttl,來控制訊息的生存時間,如果超時(兩者同時設定以最先到期的時間為準),則訊息變為dead letter(死信)

RabbitMQ針對佇列中的訊息過期時間有兩種方法可以設定。

  • A: 通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。
  • B: 對訊息進行單獨設定,每條訊息TTL可以不同。

如果同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的TTL值,就成為dead letter

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個引數,如果佇列內出現了dead letter,則按照這兩個引數重新路由轉發到指定的佇列。

  • x-dead-letter-exchange:出現dead letter之後將dead letter重新發送到指定exchange
  • x-dead-letter-routing-key:出現dead letter之後將dead letter重新按照指定的routing-key傳送

隊列出現dead letter的情況有:

  • 訊息或者佇列的TTL過期

  • 佇列達到最大長度

  • 訊息被消費端拒絕(basic.reject or basic.nack)並且requeue=false

綜合上述兩個特性,設定了TTL規則之後當訊息在一個佇列中變成死信時,利用DLX特性它能被重新轉發到另一個Exchange或者Routing Key,這時候訊息就可以重新被消費了。

設定方法:

第一步:設定TTL產生死信,有兩種方式Per-Message TTL和 Queue TTL,第一種可以針對每一條訊息設定一個過期時間使用於大多數場景,第二種針對佇列設定過期時間、適用於一次性延時任務的場景

還有其他產生死信的方式比如消費者拒絕消費 basic.reject 或者 basic.nack ( 前提要設定消費者的屬性requeue=false)
- Per-Message TTL (對每一條訊息設定一個過期時間)(官方文件

java client傳送一條只能駐留60秒的訊息到佇列:

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");//設定訊息的過期時間為60秒
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
//這條訊息傳送到相應的佇列之後,如果60秒內沒有被消費,則變為死信
  • Queue TTL (對整個佇列設定一個過期時間)

建立一個佇列,佇列的訊息過期時間為30分鐘(這個佇列30分鐘內沒有消費者消費訊息則刪除,刪除後佇列內的訊息變為死信)

java client方式:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);

rabbitmqctl命令方式(.* 為所有佇列, 可以替換為指定佇列):
rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queues

rabbitmqctl (Windows):
rabbitmqctl set_policy expiry ".*" "{""expires"":1800000}" --apply-to queues

第二步:設定死信的轉發規則(如果沒有任何規則,則直接丟棄死信)
- Dead Letter Exchanges設定方法(官方文件

Java Client方式:
//宣告一個直連模式的exchange
channel.exchangeDeclare("some.exchange.name", "direct");
//宣告一個佇列,當myqueue佇列中有死信產生時,會轉發到交換器some.exchange.name
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");

//如果設定死信會以路由鍵some-routing-key轉發到some.exchange.name,如果沒設預設為訊息傳送到本佇列時用的routing key
//args.put("x-dead-letter-routing-key", "some-routing-key");
channel.queueDeclare("myqueue", false, false, false, args);

命令列方式(.* 為所有佇列, 可以替換為指定佇列):
設定 "dead-letter-exchange"
rabbitmqctl:
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
rabbitmqctl (Windows):
rabbitmqctl set_policy DLX ".*" "{""dead-letter-exchange"":""my-dlx""}" --apply-to queues
設定 "dead-letter-routing-key"
rabbitmqctl:
rabbitmqctl set_policy DLX ".*" '{ "dead-letter-routing-key":"my-routing-key"}' --apply-to queues
rabbitmqctl (Windows):
rabbitmqctl set_policy DLX ".*" "{""dead-letter-routing-key"":""my-routing-key""}" --apply-to queues

方法二

在rabbitmq 3.5.7及以上的版本提供了一個外掛(rabbitmq-delayed-message-exchange)來實現延遲佇列功能。同時外掛依賴Erlang/OPT 18.0及以上。

安裝:

進入外掛安裝目錄
{rabbitmq-server}/plugins/(可以檢視一下當前已存在的外掛)
下載外掛
rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

(如果下載的檔名稱不規則就手動重新命名一下如:
rabbitmq_delayed_message_exchange-0.0.1.ez)

啟用外掛

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

(關閉外掛)
rabbitmq-plugins disable rabbitmq_delayed_message_exchange

外掛使用

通過宣告一個x-delayed-message型別的exchange來使用delayed-messaging特性
x-delayed-message是外掛提供的型別,並不是rabbitmq本身的

// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...

傳送訊息的時候通過在header新增”x-delay”引數來控制訊息的延時時間

// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
// ... more code ...

使用示例:

訊息傳送端:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    // 佇列名稱
    private final static String EXCHANGE_NAME="delay_exchange";
    private final static String ROUTING_KEY="key_delay";

    @SuppressWarnings("deprecation")
    public static void main(String[] argv) throws Exception {
        /**
         * 建立連線連線到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.12.190");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        // 宣告x-delayed-type型別的exchange
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true,
                false, args);


        Map<String, Object> headers = new HashMap<String, Object>();
        //設定在2016/11/04,16:45:12向消費端推送本條訊息
        Date now = new Date();
        Date timeToPublish = new Date("2016/11/04,16:45:12");

        String readyToPushContent = "publish at " + sf.format(now)
                + " \t deliver at " + sf.format(timeToPublish);

        headers.put("x-delay", timeToPublish.getTime() - now.getTime());

        AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder()
                .headers(headers);
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props.build(),
                readyToPushContent.getBytes());

        // 關閉頻道和連線
        channel.close();
        connection.close();
    }
}

訊息接收端:

import java.text.SimpleDateFormat;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    // 佇列名稱
    private final static String QUEUE_NAME = "delay_queue";
    private final static String EXCHANGE_NAME="delay_exchange";

    public static void main(String[] argv) throws Exception,
            java.lang.InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.12.190");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.queueDeclare(QUEUE_NAME, true,false,false,null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
        SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        try {
            System.out.println("****************WAIT***************");
            while(true){
                QueueingConsumer.Delivery delivery = queueingConsumer
                        .nextDelivery(); //

                String message = (new String(delivery.getBody()));
                System.out.println("message:"+message);
                System.out.println("now:\t"+sf.format(new Date()));
            }

        } catch (Exception exception) {
            exception.printStackTrace();

        }

    }
}

啟動接收端,啟動傳送端
執行結果:

****************WAIT***************
message:publish at 2016-11-04 16:44:16.887   deliver at 2016-11-04 16:45:12.000
now:    2016-11-04 16:45:12.023

結果顯示在我們2016-11-04 16:45:12.023接收到了訊息,距離我們設定的時間2016-11-04 16:45:12.023有23毫秒的延遲

Note:使用rabbitmq-delayed-message-exchange外掛時傳送到佇列的訊息數量在web管理介面可能不可見,不影響正常功能使用

Note :使用過程中發現,當一臺啟用了rabbitmq-delayed-message-exchange外掛的RAM節點在重啟的時候會無法啟動,檢視日誌發現了一個Timeout異常,開發者解釋說這是節點在啟動過程會同步叢集相關資料造成啟動超時,並建議不要使用Ram節點

外掛開發者:
RAM nodes start blank and need a disk node to sync tables from. In this case it times out.

More importantly, you don’t need RAM nodes. If you’re not sure if you do, you certainly don’t, as don’t 99% of users.