幾種延遲任務的實現思路
前言
最近有個延遲執行的任務需求,比如發了一個定時紅包,伺服器不能相信客戶端的一切,所以就得做時間的同步,但是PHP相對來講不是很適合做這種“XX秒後去執行一個什麼樣的動作這類的行為”,但是這個功能又是不可缺少的,然後就週末花時間調研了下相關的實現。大致有如下幾種:
- 藉助Redis的sorted_set和hash結構
- 自己寫一個定時器,不斷“輪詢”觸發
- 藉助語言的非同步庫
- 藉助訊息佇列等服務。
下面針對這幾點一一做下簡單的實現, 然後考慮到可維護性, 資料丟失後怎麼恢復,服務監控等一系列問題。最後選擇一個場景上來說更合適的吧。
藉助Redis實現
在正式使用Redis來實現這一延遲需求之前,我還了解到Redis的key notification事件提醒,可以在某一個key過期的時候觸發一個動作,這對於我們做延遲任務來講,的確是很好的一個契機,但是打開了它就會不可避免的造成效率上的降低,而且線上伺服器一般不會再去修改了,因此這個特性,自己瞭解下,玩玩就行了。具體的實現還是得老老實實設計資料結構了。
結構涉及
我的做法是 QUEUE 加上 CONTAINER。即會有一個根據時間不斷往前移動的時間軸作為我們的佇列,然後在佇列上每一個時間戳,作為一個連結串列往外散發,儲存多個task。

涉及描述
生產者productor.php
guo@Server218:/tmp$ cat productor.php <?php $redis = new Redis(); $redis->connect("localhost", 6379); $redis->select(2); $QUEUE = "asyncqueue:zset"; $SERIALIZER = "serialize:hash"; // 模擬生產延遲訊息 for($index=0; $index<10; $index++) { // 每秒可能會產生多條資料,但是隻要“當秒”有資料,就需要新增到queue中 $ts = time(); $cursecond = rand(0, 9) % 2 == 0; $tasklength = rand(0, 9) % 3; if($cursecond == true) { // 當前秒有task $redis->zadd($QUEUE, $ts, $ts); if($tasklength > 0) { for($i=0; $i<$tasklength;$i++) { $key = "2614677&".rand(0, 100000); $redis->hset($SERIALIZER.":".$ts, $key, $key); echo "[{$ts}] cursecond:{$cursecond}, KEY:{$key}\n"; } } } sleep($tasklength); }
消費者consumer.php
guo@Server218:/tmp$ cat consumer.php <?php $redis = new Redis(); $redis->connect("localhost", 6379); $redis->select(2); $QUEUE = "asyncqueue:zset"; $SERIALIZER = "serialize:hash"; $counter = 0; while(true) { $ts = 1542596034 + $counter; $counter++; $ret = $redis->zrangebyscore($QUEUE, $ts, $ts, array("WITHSCORES"=>true)); // 獲取下具體的task並執行 $items = $redis->hgetall($SERIALIZER.":".$ts); foreach($items as $key=>$member) { echo "CONSUMER[{$ts}]\t[{$key}]\t{$member}\n"; } if($counter>=10) { break; } }
測試
先來看看生產的具體內容。
guo@Server218:/tmp$ vim productor.php [1542596034] cursecond:1, KEY:2614677&46685 [1542596034] cursecond:1, KEY:2614677&99086 [1542596036] cursecond:1, KEY:2614677&38241 [1542596037] cursecond:1, KEY:2614677&74988 [1542596038] cursecond:1, KEY:2614677&69443 [1542596038] cursecond:1, KEY:2614677&25523 [1542596040] cursecond:1, KEY:2614677&29642 [1542596040] cursecond:1, KEY:2614677&15928 [1542596042] cursecond:1, KEY:2614677&91626 [1542596042] cursecond:1, KEY:2614677&7382 Press ENTER or type command to continue
然後看看消費者是否正確消費。
Press ENTER or type command to continue CONSUMER[1542596034][2614677&46685] 2614677&46685 CONSUMER[1542596034][2614677&99086] 2614677&99086 CONSUMER[1542596036][2614677&38241] 2614677&38241 CONSUMER[1542596037][2614677&74988] 2614677&74988 CONSUMER[1542596038][2614677&69443] 2614677&69443 CONSUMER[1542596038][2614677&25523] 2614677&25523 CONSUMER[1542596040][2614677&29642] 2614677&29642 CONSUMER[1542596040][2614677&15928] 2614677&15928 CONSUMER[1542596042][2614677&91626] 2614677&91626 CONSUMER[1542596042][2614677&7382]2614677&7382 Press ENTER or type command to continue
談談看法
- 利用Redis來實現,可以看出對Redis伺服器的QPS會有一個微幅提升,這個問題可以通過multi管道來稍微優化下,這裡就不多說了。
- 資料不會丟,這樣即便是服務掛掉也能將未消費的任務進行恢復。
- 服務監控以及可維護性尚佳,基於Redis,穩定效能得到保證。
- 不用切換語言,易於實現,也無需增加額外的中介軟體,減少了維護工作。
定時器⏲
原理
在網上搜索相關實現的時候,搜到一篇不錯的文章。 ofollow,noindex">golang實現延遲訊息的原理與方法 不錯的文章,核心思路就在於下面這張圖了。

定時器原理
程式碼實現
原文程式碼中有一個bug,就是在 執行任務輪詢 的時候沒有做休眠,會導致服務一直全速前進,這不太好。修改後的程式碼如下:
➜asyncdemos cat delayring.go package main import ( "time" "errors" "fmt" "github.com/kataras/iris" "net/http" "bytes" "log" "io/ioutil" "encoding/json" "github.com/garyburd/redigo/redis" "strconv" ) const ( TASK_TYPE_INTERVAL = 1 TASK_TYPE_DELAY = 2 QUEUE_LENGTH = 10 DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=b716e1f9f2f7d4fb93d4bb79db65a117d589f886d1757" ) //延遲訊息 type DelayMessage struct { //當前下標 curIndex int; //環形槽 slots [QUEUE_LENGTH]map[string]*Task; //關閉 closed chan bool; //任務關閉 taskClose chan bool; //時間關閉 timeClose chan bool; //啟動時間 startTime time.Time; } //執行的任務函式 type TaskFunc func(args ...interface{}); //任務 type Task struct { //迴圈次數 cycleNum int; //執行的函式 execTaskFunc; params []interface{}; catagory int } //建立一個延遲訊息 func NewDelayMessage() *DelayMessage { dm := &DelayMessage{ curIndex:0, closed:make(chan bool), taskClose: make(chan bool), timeClose: make(chan bool), startTime: time.Now(), }; for i := 0; i < QUEUE_LENGTH; i++ { dm.slots[i] = make(map[string]*Task); } return dm; } //啟動延遲訊息 func (dm *DelayMessage) Start() { go dm.taskLoop(); go dm.timeLoop(); select { case <-dm.closed: { dm.taskClose <- true; dm.timeClose <- true; break; } }; } //關閉延遲訊息 func (dm *DelayMessage) Close() { dm.closed <- true; } //處理每1秒的任務 func (dm *DelayMessage) taskLoop() { defer func() { fmt.Println("taskLoop exit"); }(); for { // TODO 看看怎麼優化比較合適,要不加這個的話,程式會執行超過一次 time.Sleep(time.Second) select { case <-dm.taskClose: { return; } default: { //取出當前的槽的任務 tasks := dm.slots[dm.curIndex]; if len(tasks) > 0 { //遍歷任務,判斷任務迴圈次數等於0,則執行任務 //否則任務迴圈次數減1 for k, v := range tasks { if v.cycleNum == 0 { fmt.Printf("\t\t\t\t\tCURINDEX[%v], key: %v, cyclenum: %v\n", dm.curIndex, k, v.cycleNum) go v.exec(v.params...); //刪除執行過的任務 對於catagory=1的週期性任務不予刪除 if v.catagory != TASK_TYPE_INTERVAL { delete(tasks, k) } } else { v.cycleNum--; } } } } } } } //處理每1秒移動下標 func (dm *DelayMessage) timeLoop() { defer func() { fmt.Println("timeLoop exit"); }(); tick := time.NewTicker(time.Second); for { select { case <-dm.timeClose: { return; } case <-tick.C: { fmt.Printf("%v, [%v]\n", time.Now().Format("2006-01-02 15:04:05"), dm.curIndex); //fmt.Println(dm.slots) //判斷當前下標,如果等於3599則重置為0,否則加1 if dm.curIndex == QUEUE_LENGTH - 1 { dm.curIndex = 0; } else { dm.curIndex++; } } } } } //新增任務 //func (dm *DelayMessage) AddTask(t time.Time, key string, catagory int, exec TaskFunc, params []interface{}) error { func (dm *DelayMessage) AddTask(seconds int, key string, catagory int, exec TaskFunc, params []interface{}) error { //if dm.startTime.After(t) { //return errors.New("時間錯誤"); //} //當前時間與指定時間相差秒數 //subSecond := t.Unix() - dm.startTime.Unix(); //subSecond := int(t.Unix() - time.Now().Unix()); subSecond := seconds //計算迴圈次數 cycleNum := int(subSecond / QUEUE_LENGTH); //計算任務所在的slots的下標 ix := (subSecond + dm.curIndex ) % QUEUE_LENGTH ; fmt.Printf("\t\t\t\t\t key: %v, cycle: %v, index: %v , curIndex: %v, subseconds: %v\n", key, cycleNum, ix, dm.curIndex, subSecond) //把任務加入tasks中 tasks := dm.slots[ix]; if _, ok := tasks[key]; ok { return errors.New("該slots中已存在key為" + key + "的任務"); } tasks[key] = &Task{ cycleNum: cycleNum, exec:exec, params:params, catagory: catagory, }; // TODO 持久化部分,這樣即便中途crash,下次重啟也能得到及時的恢復 return nil; } func (dm *DelayMessage) DeleteTask(key string) error { tasks := dm.slots[dm.curIndex] if _, ok := tasks[key]; ok { delete(tasks, key) } return nil } //func main() { //建立延遲訊息 //dm := NewDelayMessage(); ////新增任務 //dm.AddTask(time.Now().Add(time.Second*2), "test1", TASK_TYPE_DELAY, func(args ...interface{}) { //fmt.Println(args...); //}, []interface{}{1, 2, 3}); //dm.AddTask(time.Now().Add(time.Second*4), "test2", TASK_TYPE_DELAY,func(args ...interface{}) { //fmt.Println(args...); //}, []interface{}{4, 5, 6}); //dm.AddTask(time.Now().Add(time.Second*12), "test3", TASK_TYPE_DELAY, func(args ...interface{}) { //fmt.Println(args...); //}, []interface{}{"hello", "world", "test"}); //dm.AddTask(time.Now().Add(time.Second), "test4", TASK_TYPE_INTERVAL, func(args ...interface{}) { //fmt.Printf("操你媽", args...) //}, []interface{}{1, 2, 3}); // ////40秒後關閉 ////time.AfterFunc(time.Second*2, func() { //////dm.Close(); ////}); //dm.Start(); //} var mamager DelayMessage //func publish(manager *DelayMessage, seconds int, key string, exec TaskFunc, params []interface{}) error { //manager.AddTask(time.Now().Add(time.Second * time.Duration(seconds)), key, TASK_TYPE_DELAY, func(args... interface{}) { //fmt.Println("key: " + key) //}, params) // //return nil //} func httpPost(msg string, webhook string) { formatter := `{ "msgtype": "text", "text": { "content":"%s", }, "at": { "atMobiles":[], "isAtAll": false } } ` content := fmt.Sprintf(formatter, msg + "[" + time.Now().String() + "]") payload := []byte(content) resp, err := http.Post(webhook, "application/json", bytes.NewBuffer(payload)) if err != nil { log.Fatal(err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Fatal(err) } fmt.Println(string(body)) } type Message struct { Sessionid string Anchorid string Msg string } func RedisPublish(info string) { client, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { log.Fatal(err) return } defer client.Close() resp, err := client.Do("publish", "channel", info) if err != nil { // TODO 跳轉到監控報警 log.Fatal(err) return } fmt.Println(resp) } func main() { manager := NewDelayMessage() go manager.Start() app := iris.New() app.Get("/hello", func(context iris.Context) { context.WriteString("pong") }) app.Get("/publish", func(context iris.Context) { msg := context.FormValue("msg") seconds, _ := strconv.Atoi(context.FormValue("seconds")) catagory, _ := strconv.Atoi(context.FormValue("catagory")) fmt.Println("get params: " + msg) if catagory != TASK_TYPE_DELAY || catagory != TASK_TYPE_INTERVAL { catagory = TASK_TYPE_DELAY } manager.AddTask(seconds, "test1", catagory, func(args ...interface{}) { httpPost(args[0].(string), DINGTALK_WEBHOOK) }, []interface{}{msg}) context.WriteString("Added Succeed!" + time.Now().String()) }) app.Get("/delay", func(ctx iris.Context) { message := Message{ Sessionid: ctx.FormValue("sessionid"), Anchorid: ctx.FormValue("anchorid"), Msg:ctx.FormValue("msg"), } jsondata, err := json.Marshal(&message) if err != nil { ctx.WriteString(err.Error()) } RedisPublish(string(jsondata)) ctx.WriteString(string(jsondata)) }) app.Run(iris.Addr(":8080")) }%➜
測試
開啟服務 go run delayring.go
, 然後在瀏覽器中訪問服務,大致含義是3秒後觸發一個 timeout 事件,觸發釘釘機器人訊息推送。

釋出延遲任務
➜asyncdemos go run delayring.go Now listening on: http://localhost:8080 Application started. Press CMD+C to shut down. 2018-11-19 11:35:24, [0] get params: 難受 key: test1, cycle: 0, index: 4 , curIndex: 1, subseconds: 3 2018-11-19 11:35:25, [1] 2018-11-19 11:35:26, [2] 2018-11-19 11:35:27, [3] CURINDEX[4], key: test1, cyclenum: 0 {"errmsg":"ok","errcode":0} 2018-11-19 11:35:28, [4] 2018-11-19 11:35:29, [5] 2018-11-19 11:35:30, [6] 2018-11-19 11:35:31, [7] ^C[ERRO] 2018/11/19 11:35 http: Server closed

機器人訊息推送監控結果
談談感受
- 仔細看測試結果,發現時間戳和對應執行時間戳還是可以對的上的。但是有一個極大的弊端就是資料。萬一服務掛掉了,資料就會全部丟掉,這是不能容忍的。
- 程式碼可維護性也較低,當然了,程式碼沒做啥涉及,封裝的不夠完善。
- 引入了 額外的服務 , 導致整個系統的可維護性降低,增大了服務宕機的危險。
- 語言相關性較強,對非golang的業務程式有一定的門檻。
藉助第三方庫
python的tornado一向以非同步高效率著稱,非同步對它來說就是個普通的業務。所以我們無需考慮具體的實現細節,專注於業務邏輯即可。那麼今天咱也來試試水。
程式碼實現
很幸運的一下子就搜到了對應的demo,如下:
➜asyncdemos cat demo.py #coding: utf8 __author__ = "郭 璞" __email__ = "[email protected]" import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web import tornado.httpclient import tornado.gen from tornado.concurrent import run_on_executor from concurrent.futures import ThreadPoolExecutor import time from tornado.options import define, options define("port", default=8002, help="run on the port", type=int) class SleepHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(2) def get(self): seconds = self.request.arguments.get("seconds", 10) tornado.ioloop.IOLoop.instance().add_callback(self.sleep, seconds) self.write("when i sleep") @run_on_executor def sleep(self, seconds): print(time.time()) time.sleep(5) print("yes", seconds) print(time.time()) return seconds if __name__ == "__main__": # tornado.options.parse_command_line() app = tornado.web.Application( handlers=[(r"/sleep", SleepHandler), ]) http_server = tornado.httpserver.HTTPServer(app) http_server.listen(8002) tornado.ioloop.IOLoop.instance().start()%
執行服務: python demo.py
, 然後訪問服務:

訪問tornado服務
檢視下輸出結果

服務輸出結果
這裡使用了預設值引數,所以可以看出也是正確的,服務在第三秒後得到了觸發並進行了對應的執行操作。
談談感受
- 庫支援,無需考慮底層細節,專注於業務流程即可。
- 面臨著和自己寫定時器一樣的問題,那就是資料的同步,以及錯誤恢復等。
- 引入了第三方服務,系統可維護性以及宕機的可能性變大。
藉助開源軟體
在和周圍人的討論中,發現 延遲執行 的一個解決方案就是採用訊息佇列。比如beanstalk和rabbitMQ等。我沒去調研rabbitMQ怎麼用,這塊內容挺大的,光是那一大坨的配置檔案就讓人頭大,所以我傾向於使用beanstalk。
配置環境
用之前進行安裝, 啟動即可。
# 安裝 sudo apt-get install beanstalkd # 啟動, 並後臺執行。如果覺得不保險,還可以用nohup的形式 beanstalkd -l 127.0.0.1 -p 12345 &
使用的細節可以參考下面的這篇文章。 PHP使用Beanstalkd訊息佇列
我這裡問了方便自己看下原型效果,就用python簡單寫寫了。開始之前記得安裝beanstalk的依賴庫 beanstalkc ,
pip install beanstalkc
程式碼實現
先來看看生產者。
guo@Server218:/tmp$ cat beanstalkdemo.py #!/usr/bin python import beanstalkc import time conn = beanstalkc.Connection(host="localhost", port=12345) print(conn.tubes()) print(conn.stats()) conn.use("default") ts = time.time() handletime = ts + 10 conn.put("helloworld" + str(ts) + ", handletime:" + str(handletime), 1, 10) print("putted")
再來看看消費者。
guo@Server218:/tmp$ cat consumer.py #!/usr/bin python import beanstalkc import time conn = beanstalkc.Connection(host="localhost", port=12345) conn.use("default") job = conn.reserve() print(job.body) job.delete() ts = time.time() print("CONSUME DONE: " + str(ts))
測試
- 先執行生產者。
guo@Server218:/tmp$ python beanstalkdemo.py ['default'] {'current-connections': 1, 'max-job-size': 65535, 'cmd-release': 0, 'cmd-reserve': 0, 'pid': 8384, 'cmd-bury': 0, 'current-producers': 0, 'total-jobs': 0, 'current-jobs-ready': 0, 'cmd-peek-buried': 0, 'current-tubes': 1, 'id': 'b0b7cf3b44c2e296', 'current-jobs-delayed': 0, 'uptime': 2, 'cmd-watch': 0, 'hostname': 'Server218', 'job-timeouts': 0, 'cmd-stats': 1, 'rusage-stime': 0.0, 'version': 1.1, 'current-jobs-reserved': 0, 'current-jobs-buried': 0, 'cmd-reserve-with-timeout': 0, 'cmd-put': 0, 'cmd-pause-tube': 0, 'cmd-list-tubes-watched': 0, 'cmd-list-tubes': 1, 'current-workers': 0, 'cmd-list-tube-used': 0, 'cmd-ignore': 0, 'binlog-records-migrated': 0, 'current-waiting': 0, 'cmd-peek': 0, 'cmd-peek-ready': 0, 'cmd-peek-delayed': 0, 'cmd-touch': 0, 'binlog-oldest-index': 0, 'binlog-current-index': 0, 'cmd-use': 0, 'total-connections': 1, 'cmd-delete': 0, 'binlog-max-size': 10485760, 'cmd-stats-job': 0, 'rusage-utime': 0.0, 'cmd-stats-tube': 0, 'binlog-records-written': 0, 'cmd-kick': 0, 'current-jobs-urgent': 0} putted
- 跑一下消費者,看看效果。
guo@Server218:/tmp$ python consumer.py helloworld1542605160.63, handletime:1542605170.63 CONSUME DONE: 1542605172.33
從上面可以看出, 延遲執行 的目標已經實現了。
談談感受
- 引入了第三方服務,造成了維護成本的增加。
- 解耦性比較好,語言無關。
- 資料可較好的儲存,不至於丟失資料,容錯性好。
總結
調研了這麼多,發現每一個都有自己的優缺點吧,沒有說哪一個是最好的選擇,只能算是合適的場景選擇合適的服務。
且行且思