1. 程式人生 > >redis 在業務層面的應用之定時器

redis 在業務層面的應用之定時器

    前幾天出去面試,大家都喜歡聊redis,一個是底層資料結構的實現,一個是在業務層的使用,這裡就結合一些簡單的python程式碼,講下怎樣用redis 做應用層面的定時器。

    首先,當大批量任務做超時管理,就會涉及到如何實現定時器,使系統開銷最小的問題。通常的底層是一個timer 加一個最小堆(有個自己的實現,可以參考:https://my.oschina.net/u/2950272/blog/1822495),或者是環形陣列,或者是紅黑樹。每種方式都用的還比較多的,nginx 使用的是紅黑樹,golang 底層time 使用的是最小堆。當業務層面要自己實現定時回撥,其實使用redis 也可以做,而且還蠻簡單的,就是zset 就ok。

    具體的思路是怎樣呢?zadd 的時候,val 設定成taskid, score 設定成為timestamp 即可。然後主程式碼,開個執行緒,隔一段時間sleep 去zRangeByScore 當前時間戳到之前標記時間的值,然後就可以簡單的用redis 管理超時任務了。

    具體例子的可以看下面的一個demo:

import redis
import time
import threading

host = "127.0.0.1"
port = "6379"
password = "123123"
db = 0

pool = redis.ConnectionPool(host=host, port=port, password=password, db=db)
rd = redis.StrictRedis(connection_pool=pool)

def put_task(task_id, timeout):
    due_time = int(time.time() + timeout)*100
    print "put", task_id
    rd.zadd("task", task_id, due_time)

def callback(task_id):
    print "consume taskid", task_id

def get_task():
    now = int(time.time() * 100)
    tasks = rd.zrangebyscore("task", 0, now)
    if tasks:
        rd.zremrangebyscore("task", 0, now)
    return tasks

def produce_task():
    while True:
        task_id = int(time.time() * 100)
        put_task(task_id, 1)
        time.sleep(1)

def consume_task():
    print "start loop"

    while True:
        task_ids = get_task()
        if task_ids:
            for task_id in task_ids:
                callback(task_id)

        time.sleep(1)

if __name__ == "__main__":
    t1 = threading.Thread(target=produce_task)
    t2 = threading.Thread(target=consume_task)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    多個執行緒註冊回撥,一個執行緒隔一段時間去輪訓資料庫即可,具體時間精度,可以控制時間戳和和迴圈最小週期,再明白思路的情況下看這個demo 應該是很好理解的。

    在我們線上測試,內網環境下,qps 在2w 是沒有壓力的。