1. 程式人生 > >延遲任務的幾種實現思路

延遲任務的幾種實現思路

前言

最近有個延遲執行的任務需求,比如發了一個定時紅包,伺服器不能相信客戶端的一切,所以就得做時間的同步,但是PHP相對來講不是很適合做這種“XX秒後去執行一個什麼樣的動作這類的行為”,但是這個功能又是不可缺少的,然後就週末花時間調研了下相關的實現。大致有如下幾種:

  • 藉助Redis的sorted_set和hash結構
  • 自己寫一個定時器,不斷“輪詢”觸發
  • 藉助語言的非同步庫
  • 藉助訊息佇列等服務。

下面針對這幾點一一做下簡單的實現, 然後考慮到可維護性, 資料丟失後怎麼恢復,服務監控等一系列問題。最後選擇一個場景上來說更合適的吧。

藉助Redis實現

在正式使用Redis來實現這一延遲需求之前,我還了解到Redis的key notification事件提醒,可以在某一個key過期的時候觸發一個動作,這對於我們做延遲任務來講,的確是很好的一個契機,但是打開了它就會不可避免的造成效率上的降低,而且線上伺服器一般不會再去修改了,因此這個特性,自己瞭解下,玩玩就行了。具體的實現還是得老老實實設計資料結構了。

結構涉及

我的做法是 QUEUE 加上 CONTAINER。即會有一個根據時間不斷往前移動的時間軸作為我們的佇列,然後在佇列上每一個時間戳,作為一個連結串列往外散發,儲存多個task。
涉及描述

生產者productor.php

[email protected]
:/tmp$ cat productor.php <?php $redis = new Redis(); $redis->connect("localhost", 6379); $redis->select(2); $QUEUE = "asyncqueue:zset"; $SERIALIZER = "serialize:hash"; // 模擬生產延遲訊息 for($index=0; $index<10; $index++) { // 每秒可能會產生多條資料,但是隻要“當秒”有資料,就需要新增到queue中 $ts = time(); $cursecond = rand(0, 9) % 2 == 0; $tasklength = rand(0, 9) % 3; if($cursecond == true) { // 當前秒有task $redis->zadd($QUEUE, $ts, $ts); if($tasklength > 0) { for($i=0; $i<$tasklength;$i++) { $key = "2614677&".rand(0, 100000); $redis->hset($SERIALIZER.":".$ts, $key, $key); echo "[{$ts}] cursecond:{$cursecond}, KEY:{$key}\n"; } } } sleep($tasklength); }

消費者consumer.php

  [email protected]:/tmp$ cat consumer.php
<?php
$redis = new Redis();
$redis->connect("localhost", 6379);
$redis->select(2);
$QUEUE = "asyncqueue:zset";
$SERIALIZER = "serialize:hash";
$counter = 0;
while(true) {
    $ts = 1542596034 + $counter;
    $counter++;
    $ret = $redis->zrangebyscore($QUEUE, $ts, $ts, array("WITHSCORES"=>true));
    // 獲取下具體的task並執行
    $items = $redis->hgetall($SERIALIZER.":".$ts);
    foreach($items as $key=>$member) {
        echo "CONSUMER[{$ts}]\t[{$key}]\t{$member}\n";
    }
    if($counter>=10) {
        break;
    }
}

測試

先來看看生產的具體內容。

[email protected]:/tmp$ vim productor.php
[1542596034] cursecond:1, KEY:2614677&46685
[1542596034] cursecond:1, KEY:2614677&99086
[1542596036] cursecond:1, KEY:2614677&38241
[1542596037] cursecond:1, KEY:2614677&74988
[1542596038] cursecond:1, KEY:2614677&69443
[1542596038] cursecond:1, KEY:2614677&25523
[1542596040] cursecond:1, KEY:2614677&29642
[1542596040] cursecond:1, KEY:2614677&15928
[1542596042] cursecond:1, KEY:2614677&91626
[1542596042] cursecond:1, KEY:2614677&7382

Press ENTER or type command to continue

然後看看消費者是否正確消費。

Press ENTER or type command to continue
CONSUMER[1542596034]	[2614677&46685]	2614677&46685
CONSUMER[1542596034]	[2614677&99086]	2614677&99086
CONSUMER[1542596036]	[2614677&38241]	2614677&38241
CONSUMER[1542596037]	[2614677&74988]	2614677&74988
CONSUMER[1542596038]	[2614677&69443]	2614677&69443
CONSUMER[1542596038]	[2614677&25523]	2614677&25523
CONSUMER[1542596040]	[2614677&29642]	2614677&29642
CONSUMER[1542596040]	[2614677&15928]	2614677&15928
CONSUMER[1542596042]	[2614677&91626]	2614677&91626
CONSUMER[1542596042]	[2614677&7382]	2614677&7382

Press ENTER or type command to continue

談談看法

  • 利用Redis來實現,可以看出對Redis伺服器的QPS會有一個微幅提升,這個問題可以通過multi管道來稍微優化下,這裡就不多說了。
  • 資料不會丟,這樣即便是服務掛掉也能將未消費的任務進行恢復。
  • 服務監控以及可維護性尚佳,基於Redis,穩定效能得到保證。
  • 不用切換語言,易於實現,也無需增加額外的中介軟體,減少了維護工作。

定時器⏲

原理

在網上搜索相關實現的時候,搜到一篇不錯的文章。golang實現延遲訊息的原理與方法 不錯的文章,核心思路就在於下面這張圖了。
定時器原理

程式碼實現

原文程式碼中有一個bug,就是在執行任務輪詢的時候沒有做休眠,會導致服務一直全速前進,這不太好。修改後的程式碼如下:

➜  asyncdemos cat delayring.go
package main

import (
	"time"
	"errors"
	"fmt"
	"github.com/kataras/iris"
	"net/http"
	"bytes"
	"log"
	"io/ioutil"
	"encoding/json"
	"github.com/garyburd/redigo/redis"
	"strconv"
)

const (
	TASK_TYPE_INTERVAL = 1
	TASK_TYPE_DELAY = 2
	QUEUE_LENGTH = 10
	DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=b716e1f39b7fc7afbea04b23909f2f779db65a117d589f886d1757"

)

//延遲訊息
type DelayMessage struct {
	//當前下標
	curIndex int;
	//環形槽
	slots [QUEUE_LENGTH]map[string]*Task;
	//關閉
	closed chan bool;
	//任務關閉
	taskClose chan bool;
	//時間關閉
	timeClose chan bool;
	//啟動時間
	startTime time.Time;
}

//執行的任務函式
type TaskFunc func(args ...interface{});

//任務
type Task struct {
	//迴圈次數
	cycleNum int;
	//執行的函式
	exec   TaskFunc;
	params []interface{};
    catagory int
}

//建立一個延遲訊息
func NewDelayMessage() *DelayMessage {
	dm := &DelayMessage{
		curIndex:  0,
		closed:    make(chan bool),
		taskClose: make(chan bool),
		timeClose: make(chan bool),
		startTime: time.Now(),
	};
	for i := 0; i < QUEUE_LENGTH; i++ {
		dm.slots[i] = make(map[string]*Task);
	}
	return dm;
}

//啟動延遲訊息
func (dm *DelayMessage) Start() {
	go dm.taskLoop();
	go dm.timeLoop();
	select {
	case <-dm.closed:
		{
			dm.taskClose <- true;
			dm.timeClose <- true;
			break;
		}
	};
}

//關閉延遲訊息
func (dm *DelayMessage) Close() {
	dm.closed <- true;
}

//處理每1秒的任務
func (dm *DelayMessage) taskLoop() {
	defer func() {
		fmt.Println("taskLoop exit");
	}();
	for {
		// TODO 看看怎麼優化比較合適,要不加這個的話,程式會執行超過一次
		time.Sleep(time.Second)
		select {
		case <-dm.taskClose:
			{
				return;
			}
		default:
			{
				//取出當前的槽的任務
				tasks := dm.slots[dm.curIndex];
				if len(tasks) > 0 {
					//遍歷任務,判斷任務迴圈次數等於0,則執行任務
					//否則任務迴圈次數減1
					for k, v := range tasks {
						if v.cycleNum == 0 {
							fmt.Printf("\t\t\t\t\tCURINDEX[%v], key: %v, cyclenum: %v\n", dm.curIndex, k, v.cycleNum)
							go v.exec(v.params...);
							//刪除執行過的任務 對於catagory=1的週期性任務不予刪除
							if v.catagory != TASK_TYPE_INTERVAL {
								delete(tasks, k)
							}
						} else {
							v.cycleNum--;
						}
					}
				}
			}
		}
	}
}

//處理每1秒移動下標
func (dm *DelayMessage) timeLoop() {
	defer func() {
		fmt.Println("timeLoop exit");
	}();
	tick := time.NewTicker(time.Second);
	for {
		select {
		case <-dm.timeClose:
			{
				return;
			}
		case <-tick.C:
			{
				fmt.Printf("%v, [%v]\n", time.Now().Format("2006-01-02 15:04:05"), dm.curIndex);
				//fmt.Println(dm.slots)
				//判斷當前下標,如果等於3599則重置為0,否則加1
				if dm.curIndex == QUEUE_LENGTH - 1 {
					dm.curIndex = 0;
				} else {
					dm.curIndex++;
				}
			}
		}
	}
}

//新增任務
//func (dm *DelayMessage) AddTask(t time.Time, key string, catagory int, exec TaskFunc, params []interface{}) error {
func (dm *DelayMessage) AddTask(seconds int, key string, catagory int, exec TaskFunc, params []interface{}) error {
	//if dm.startTime.After(t) {
	//	return errors.New("時間錯誤");
	//}
	//當前時間與指定時間相差秒數
	//subSecond := t.Unix() - dm.startTime.Unix();
	//subSecond := int(t.Unix() - time.Now().Unix());
	subSecond := seconds
	//計算迴圈次數
	cycleNum := int(subSecond / QUEUE_LENGTH);
	//計算任務所在的slots的下標
	ix := (subSecond + dm.curIndex ) % QUEUE_LENGTH ;
	fmt.Printf("\t\t\t\t\t key: %v, cycle: %v, index: %v , curIndex: %v, subseconds: %v\n", key, cycleNum, ix, dm.curIndex, subSecond)
	//把任務加入tasks中
	tasks := dm.slots[ix];
	if _, ok := tasks[key]; ok {
		return errors.New("該slots中已存在key為" + key + "的任務");
	}
	tasks[key] = &Task{
		cycleNum: cycleNum,
		exec:     exec,
		params:   params,
		catagory: catagory,
	};
	// TODO 持久化部分,這樣即便中途crash,下次重啟也能得到及時的恢復
	return nil;
}

func (dm *DelayMessage) DeleteTask(key string) error {
	tasks := dm.slots[dm.curIndex]
	if _, ok := tasks[key]; ok {
		delete(tasks, key)
	}
	return nil
}

//func main() {
	//建立延遲訊息
	//dm := NewDelayMessage();
	////新增任務
	//dm.AddTask(time.Now().Add(time.Second*2), "test1", TASK_TYPE_DELAY, func(args ...interface{}) {
	//	fmt.Println(args...);
	//}, []interface{}{1, 2, 3});
	//dm.AddTask(time.Now().Add(time.Second*4), "test2", TASK_TYPE_DELAY,  func(args ...interface{}) {
	//	fmt.Println(args...);
	//}, []interface{}{4, 5, 6});
	//dm.AddTask(time.Now().Add(time.Second*12), "test3", TASK_TYPE_DELAY, func(args ...interface{}) {
	//	fmt.Println(args...);
	//}, []interface{}{"hello", "world", "test"});
	//dm.AddTask(time.Now().Add(time.Second), "test4", TASK_TYPE_INTERVAL, func(args ...interface{}) {
	//	fmt.Printf("操你媽", args...)
	//}, []interface{}{1, 2, 3});
	//
	////40秒後關閉
	////time.AfterFunc(time.Second*2, func() {
	////	//dm.Close();
	////});
	//dm.Start();
//}


var mamager DelayMessage


//func publish(manager *DelayMessage, seconds int, key string, exec TaskFunc, params []interface{}) error {
//	manager.AddTask(time.Now().Add(time.Second * time.Duration(seconds)), key, TASK_TYPE_DELAY, func(args... interface{}) {
//		fmt.Println("key: " + key)
//	}, params)
//
//	return nil
//}

func httpPost(msg string, webhook string) {
	formatter := `{
            "msgtype": "text",
            "text": {
                "content":"%s",
            },
			"at": {
           	    "atMobiles":[],
				"isAtAll": false
           }
        }
    `
	content := fmt.Sprintf(formatter, msg + "[" + time.Now().String() + "]")
	payload := []byte(content)
	resp, err := http.Post(webhook, "application/json", bytes.NewBuffer(payload))
	if err != nil {
		log.Fatal(err)
	}
	defer resp.Body.Close()
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(string(body))
}

type Message struct {
	Sessionid string
	Anchorid string
	Msg string
}

func RedisPublish(info string) {
	client, err := redis.Dial("tcp", "127.0.0.1:6379")
	if err != nil {
		log.Fatal(err)
		return
	}
	defer client.Close()
	resp, err := client.Do("publish", "channel", info)
	if err != nil {
		// TODO 跳轉到監控報警
		log.Fatal(err)
		return
	}
	fmt.Println(resp)
}

func main() {
	manager := NewDelayMessage()
	go manager.Start()
	app := iris.New()
	app.Get("/hello", func(context iris.Context) {
		context.WriteString("pong")
	})
	app.Get("/publish", func(context iris.Context) {
		msg := context.FormValue("msg")
		seconds, _ := strconv.Atoi(context.FormValue("seconds"))
		catagory, _ := strconv.Atoi(context.FormValue("catagory"))
		fmt.Println("get params: " + msg)
		if catagory != TASK_TYPE_DELAY || catagory != TASK_TYPE_INTERVAL {
			catagory = TASK_TYPE_DELAY
		}
		manager.AddTask(seconds, "test1", catagory, func(args ...interface{}) {
			httpPost(args[0].(string), DINGTALK_WEBHOOK)
		}, []interface{}{msg})
		context.WriteString("Added Succeed!" + time.Now().String())
	})
	app.Get("/delay", func(ctx iris.Context) {
		message := Message{
			Sessionid: ctx.FormValue("sessionid"),
			Anchorid: ctx.FormValue("anchorid"),
			Msg:ctx.FormValue("msg"),
		}
		jsondata, err := json.Marshal(&message)
		if err != nil {
			ctx.WriteString(err.Error())
		}
		RedisPublish(string(jsondata))
		ctx.WriteString(string(jsondata))

	})
	app.Run(iris.Addr(":8080"))

}%                                                                                                                                                                              ➜

測試

開啟服務go run delayring.go, 然後在瀏覽器中訪問服務,大致含義是3秒後觸發一個timeout事件,觸發釘釘機器人訊息推送。
釋出延遲任務

➜  asyncdemos go run delayring.go
Now listening on: http://localhost:8080
Application started. Press CMD+C to shut down.
2018-11-19 11:35:24, [0]
get params: 難受
					 key: test1, cycle: 0, index: 4 , curIndex: 1, subseconds: 3
2018-11-19 11:35:25, [1]
2018-11-19 11:35:26, [2]
2018-11-19 11:35:27, [3]
					CURINDEX[4], key: test1, cyclenum: 0
{"errmsg":"ok","errcode":0}
2018-11-19 11:35:28, [4]
2018-11-19 11:35:29, [5]
2018-11-19 11:35:30, [6]
2018-11-19 11:35:31, [7]
^C[ERRO] 2018/11/19 11:35 http: Server closed

機器人訊息推送監控結果

談談感受

  • 仔細看測試結果,發現時間戳和對應執行時間戳還是可以對的上的。但是有一個極大的弊端就是資料。萬一服務掛掉了,資料就會全部丟掉,這是不能容忍的。
  • 程式碼可維護性也較低,當然了,程式碼沒做啥涉及,封裝的不夠完善。
  • 引入了額外的服務, 導致整個系統的可維護性降低,增大了服務宕機的危險。
  • 語言相關性較強,對非golang的業務程式有一定的門檻。

藉助第三方庫

python的tornado一向以非同步高效率著稱,非同步對它來說就是個普通的業務。所以我們無需考慮具體的實現細節,專注於業務邏輯即可。那麼今天咱也來試試水。

程式碼實現

很幸運的一下子就搜到了對應的demo,如下:

➜  asyncdemos cat demo.py
#coding: utf8
__author__ = "郭 璞"
__email__ = "[email protected]"

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import tornado.gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
import time
from tornado.options import define, options

define("port", default=8002, help="run on the port", type=int)

class SleepHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(2)

    def get(self):
        seconds = self.request.arguments.get("seconds", 10)
        tornado.ioloop.IOLoop.instance().add_callback(self.sleep, seconds)
        self.write("when i sleep")

    @run_on_executor
    def sleep(self, seconds):
        print(time.time())
        time.sleep(5)
        print("yes", seconds)
        print(time.time())
        return seconds

if __name__ == "__main__":
    # tornado.options.parse_command_line()
    app = tornado.web.Application(
        handlers=[(r"/sleep", SleepHandler), ])
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(8002)
    tornado.ioloop.IOLoop.instance().start()%

執行服務: python demo.py, 然後訪問服務:
訪問tornado服務

檢視下輸出結果
服務輸出結果

這裡使用了預設值引數,所以可以看出也是正確的,服務在第三秒後得到了觸發並進行了對應的執行操作。

談談感受

  • 庫支援,無需考慮底層細節,專注於業務流程即可。
  • 面臨著和自己寫定時器一樣的問題,那就是資料的同步,以及錯誤恢復等。
  • 引入了第三方服務,系統可維護性以及宕機的可能性變大。

藉助開源軟體

在和周圍人的討論中,發現延遲執行的一個解決方案就是採用訊息佇列。比如beanstalk和rabbitMQ等。我沒去調研rabbitMQ怎麼用,這塊內容挺大的,光是那一大坨的配置檔案就讓人頭大,所以我傾向於使用beanstalk。

配置環境

用之前進行安裝, 啟動即可。

# 安裝
sudo apt-get install beanstalkd
# 啟動, 並後臺執行。如果覺得不保險,還可以用nohup的形式
beanstalkd -l 127.0.0.1 -p 12345 &

使用的細節可以參考下面的這篇文章。PHP使用Beanstalkd訊息佇列

我這裡問了方便自己看下原型效果,就用python簡單寫寫了。開始之前記得安裝beanstalk的依賴庫beanstalkc

pip install beanstalkc

程式碼實現

先來看看生產者。

[email protected]:/tmp$ cat beanstalkdemo.py
#!/usr/bin python
import beanstalkc
import time
conn = beanstalkc.Connection(host="localhost", port=12345)
print(conn.tubes())
print(conn.stats())
conn.use("default")
ts = time.time()
handletime = ts + 10
conn.put("helloworld" + str(ts) + ", handletime:" + str(handletime), 1, 10)
print("putted")

再來看看消費者。

[email protected]:/tmp$ cat consumer.py
#!/usr/bin python
import beanstalkc
import time
conn = beanstalkc.Connection(host="localhost", port=12345)
conn.use("default")
job = conn.reserve()
print(job.body)
job.delete()
ts = time.time()
print("CONSUME DONE: " + str(ts))

測試

  • 先執行生產者。
[email protected]:/tmp$ python beanstalkdemo.py
['default']
{'current-connections': 1, 'max-job-size': 65535, 'cmd-release': 0, 'cmd-reserve': 0, 'pid': 8384, 'cmd-bury': 0, 'current-producers': 0, 'total-jobs': 0, 'current-jobs-ready': 0, 'cmd-peek-buried': 0, 'current-tubes': 1, 'id': 'b0b7cf3b44c2e296', 'current-jobs-delayed': 0, 'uptime': 2, 'cmd-watch': 0, 'hostname': 'Server218', 'job-timeouts': 0, 'cmd-stats': 1, 'rusage-stime': 0.0, 'version': 1.1, 'current-jobs-reserved': 0, 'current-jobs-buried': 0, 'cmd-reserve-with-timeout': 0, 'cmd-put': 0, 'cmd-pause-tube': 0, 'cmd-list-tubes-watched': 0, 'cmd-list-tubes': 1, 'current-workers': 0, 'cmd-list-tube-used': 0, 'cmd-ignore': 0, 'binlog-records-migrated': 0, 'current-waiting': 0, 'cmd-peek': 0, 'cmd-peek-ready': 0, 'cmd-peek-delayed': 0, 'cmd-touch': 0, 'binlog-oldest-index': 0, 'binlog-current-index': 0, 'cmd-use': 0, 'total-connections': 1, 'cmd-delete': 0, 'binlog-max-size': 10485760, 'cmd-stats-job': 0, 'rusage-utime': 0.0, 'cmd-stats-tube': 0, 'binlog-records-written': 0, 'cmd-kick': 0, 'current-jobs-urgent': 0}
putted
  • 跑一下消費者,看看效果。
[email protected]:/tmp$ python consumer.py
helloworld1542605160.63, handletime:1542605170.63
CONSUME DONE: 1542605172.33

從上面可以看出,延遲執行的目標已經實現了。

談談感受

  • 引入了第三方服務,造成了維護成本的增加。
  • 解耦性比較好,語言無關。
  • 資料可較好的儲存,不至於丟失資料,容錯性好。

總結

調研了這麼多,發現每一個都有自己的優缺點吧,沒有說哪一個是最好的選擇,只能算是合適的場景選擇合適的服務。

且行且思