1. 程式人生 > >分散式延遲訊息佇列實現分析與設計

分散式延遲訊息佇列實現分析與設計

介紹

延遲佇列,顧名思義它是一種帶有延遲功能的訊息佇列。 那麼,是在什麼場景下我才需要這樣的佇列呢?

很多時候我們會有延時處理一個任務的需求,比如說:

2個小時後給使用者傳送簡訊。
15分鐘後關閉網路連線。
2分鐘後再次嘗試回撥。

下面我們來分別探討一下幾種實現方案:

Java中的DelayQueue

Java中的DelayQueue位於java.util.concurrent包下,本質是由PriorityQueue和BlockingQueue實現的阻塞優先順序佇列。

放入佇列的元素需要實現Delayed介面:

public interface Delayed extends Comparable<Delayed> {
    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}
通過實現這個介面,來完成對佇列中元素,按照時間延遲先後排序的目的。

從佇列中取元素:

/**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    else if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
可以看到,在這段程式碼裡,在第一個元素的延遲時間還沒到的情況下:
  • 如果當前沒有其他執行緒等待,則阻塞當前執行緒直到延遲時間。
  • 如果有其他執行緒在等待,則阻塞當前執行緒。
向佇列中放入元素:
/**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return <tt>true</tt>
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
在放入元素的時候,會喚醒等待中的讀執行緒。

如果我們不考慮分散式執行和任務持久化的話,Java中的DelayQueue是一個很理想的方案,精巧好用。但是如果我們需要分散式執行和任務持久化,就需要引入一些外部元件。

使用Redis實現

前文我們看到,可以通過優先順序佇列來實現延遲佇列的功能。Redis提供了很多資料結構,其中的zset是一種有序的資料結構;我們可以通過Redis中的zset來實現一個延遲佇列。

基本的方法就是使用時間戳作為元素的score存入zset。

redis> ZADD delayqueue <future_timestamp> "messsage"  
獲取所有已經“就緒”的message,並且刪除message。

redis> MULTI  
redis> ZRANGEBYSCORE delayqueue 0 <current_timestamp>  
redis> ZREMRANGEBYSCORE delayqueue 0 <current_timestamp>  
redis> EXEC
但是這個方案也有一些問題:

Redis事務雖然保證了一致性和隔離性,但是並沒有提供回滾功能。訊息處理失敗是不能被恢復的,如果處理某條訊息的執行緒崩潰或機器宕機,這條未被處理不能被自動的再次處理。

也有考慮過將分為TODO和Doing兩條佇列:

先從TODO佇列中取出任務,放入Doing中,再開始處理;如果停留在Doing佇列總過久,則重新放入TODO佇列。但是由於Redis的事務特性,並不能做到完全可靠;並且檢查Doing超時的邏輯也略複雜。那麼有沒有一個成熟的訊息佇列可以支援延遲投遞訊息的功能呢?答案當然是有的,本文的標題就是使用RabbitMQ實現DelayQueue。

使用RabbitMQ實現

這是RabbitMQ眾多隱藏的強大特性中的一個,可以輕鬆的降低程式碼的複雜度,實現DelayQueue的功能。

我們需要兩個佇列,一個用來做主佇列,真正的投遞訊息;另一個用來延遲處理訊息。

ConnectionFactory factory = new ConnectionFactory();  
factory.setHost(host);  
factory.setPort(port);  
connection = factory.newConnection();  
channel = connection.createChannel();

channel.queueDeclare("MAIN_QUEUE", true, false, false, null);  
channel.queueBind("MAIN_QUEUE", "amq.direct", "MAIN_QUEUE");

HashMap<String, Object> arguments = new HashMap<String, Object>();  
arguments.put("x-dead-letter-exchange", "amq.direct");  
arguments.put("x-dead-letter-routing-key", "MAIN_QUEUE");  
channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments);  
放入延遲訊息:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();  
AMQP.BasicProperties properties = builder.expiration(String.valueOf(task.getDelayMillis())).deliveryMode(2).build();  
channel.basicPublish("", "DELAY_QUEUE", properties, SerializationUtils.serialize(task));  
而關鍵點,就在於 x-dead-letter-exchange 和 x-dead-letter-routing-key 兩個引數上。這兩個引數說明了:訊息過期後的處理方式 --> 投遞到我們指定的MAIN_QUEUE;然後我們只需要在MAIN_QUEUE中等待訊息投遞即可。

RabbitMQ本身提供了訊息持久化和沒有收到ACK的重投遞功能,這樣我們就可以實現一個高可靠的分散式延遲訊息隊列了。

PS:上面講述的RabbitMQ定時任務方案有問題,RabbitMQ TTL文件 中寫道:

Caveats

While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired. Hence resources used by such expired messages will not be freed, and they will be counted in queue statistics (e.g. the number of messages in the queue).

per-queue TTL不會有問題,因為快要過期的訊息總是在佇列的前邊;但是如果使用per-message TTL的話,過期的訊息有可能會在未過期的訊息後邊,直到前邊的訊息過期或者被消費。因為RabbitMQ保證過期的訊息一定不會被消費者消費,但是不能保證訊息過期就會從佇列中移除。

ActiveMQ

ActiveMQ from version 5.4 has an optional persistent scheduler built into the ActiveMQ message broker.

可以支援定時、延遲投遞、重複投遞和Cron排程。

在配置檔案中,啟用<broker ... schedulerSupport="true"> 選項後即可使用。

MessageProducer producer = session.createProducer(destination);  
TextMessage message = session.createTextMessage("test msg");  
long delay = 30 * 1000;  
long period = 10 * 1000;  
int repeat = 9;  
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);  
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);  
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);  
producer.send(message);  
MessageProducer producer = session.createProducer(destination);  
TextMessage message = session.createTextMessage("test msg");  
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");  
producer.send(message); 
ActiveMQ配置項介紹:

Property                                        type                                  Description

AMQ_SCHEDULED_DELAY         false                   The time in milliseconds that a message will wait before

                                                                                being scheduled to be delivered by the broker

AMQ_SCHEDULED_DELAY         false                   訊息延遲傳送的延遲時間(單位毫秒)                                                                                

AMQ_SCHEDULED_PERIOD      false                     The time in milliseconds after the start time to wait before

                                                                                scheduling the message again

AMQ_SCHEDULED_PERIOD      false                     代理啟動後,傳送訊息之前的等待時間(單位毫秒).                                                                                

AMQ_SCHEDULED_REPEAT      false                     The number of times to repeat scheduling a message for delivery

AMQ_SCHEDULED_REPEAT      false                     排程訊息傳送的重複次數

AMQ_SCHEDULED_CRON        String                    Use a cron entry to set the schedule

AMQ_SCHEDULED_CRON        String                    使用一個cron實體設定訊息傳送排程

文章引自:http://zhangyp.net/rabbitmq-delayqueue/