1. 程式人生 > >延時佇列

延時佇列

這節延時佇列就是乘著上節的鎖衝突處理的延時佇列來的。

 

但是在此之前我要講一下我們如何編輯python檔案,因為總是寫在命令列不能儲存程式碼,也不能修改。

我們可以先vi  檔名.py這樣就建立了一個.py檔案

然後我們到這個檔案所在的位置開啟這個檔案就可以編輯了

儲存後在命令列到達檔案所在的位置輸入python 檔名.py就運行了剛才編輯的python程式碼

 

 redis的訊息佇列不是專業的訊息佇列,它沒有非常多的高階特性,沒有ack保證,如果對訊息的可靠性有著極致的追求,那麼它就不適合使用。

redis的list資料結構常用來作為非同步訊息佇列使用,客戶端是通過佇列的pop操作來獲取訊息然後進行處理。處理完了再接著獲取訊息,再進行處理,如此迭代,這就是作為佇列消費者的客戶端宣告週期。但是如果佇列空了,客戶端會陷入pop的死迴圈,會浪費時間與空間。佇列延遲指令可以有效的解決這一情況。blpop、brpop,這兩個指令的字首b代表的是blocking,也就是阻塞讀。阻塞讀在佇列沒有資料的時候會進入休眠狀態,一旦資料到來,則立刻醒過來。訊息延遲幾乎為零。但是這裡有一個問題,執行緒一直阻塞在那裡,redis的客戶端連線閒置太久,伺服器一般會主動斷開連線,減少閒置資源佔用。這時blpop、brpop會丟擲異常,所以編寫客戶端消費者的時候要小心,注意捕獲異常,還要重試。

延時佇列可以通過redis的zset(有序列表)來實現。我們將訊息序列化一個字串作為zset的value這個訊息的到期時間作為score,然後用多個執行緒輪詢zset獲取到期的任務進行處理,多個執行緒是為了保障可用性,萬一掛了一個執行緒還有其他執行緒可以繼續處理。因為有多個執行緒,所以需要考慮併發爭搶任務,確保任務不能被多次執行。

 

def delay(msg):
    msg.id=str(uuid.uuid4())
    value=json.dumps(msg)
    retry_ts=time.time()+5
    redis.zadd("delay-queue
",retry_ts,value) def loop(): while True: values=redis.zrangebyscore("delay-queue",0,time.time(),start=0,num=1) if not value: time.sleep(1) continue value=values[0] success=redis.zrem("delay-queue",value) if success: msg
=json.loads(value) handle_msg(msg)

 

redis的zrem方法是多執行緒多程序爭搶任務的關鍵,它的返回值決定了當前例項有沒有搶到任務,因為loop方法可能會被多個程序多個執行緒呼叫,同一個任務可能會被多個程序執行緒搶到,通過zrem來決定唯一的屬主。同時我們要注意對handle_msg進行異常捕獲,避免因為個別任務處理問題導致迴圈異常退出。