1. 程式人生 > >golang 超時佇列實現與使用

golang 超時佇列實現與使用

超時佇列

https://github.com/fwhezfwhez/go-queue
目前業務上需要一個速度快,資料超時刪除的記憶體佇列,實現和使用如下:

package  main
import (
	queue "github.com/fwhezfwhez/go-queue"
	"fmt"
)
func main() {
    //初始化,init
    q:= queue.NewEmpty()
    //壓入,push
    q.Push(5)
    q.Push(4)
    //列印,print
    q.Print()
    //出列,pop
    fmt.Println
(q.Pop()) //列印,print q.Print() //長度,len fmt.Println(q.Length()) //併發安全壓入,currently safe push q.SafePush(6) //併發安全出列,currently safe pop fmt.Print(q.SafePop()) q.Print() // time queue tq := queue.TimeQueueWithTimeStep(10*time.Second, 50, 1*time.Nanosecond) tq.StartTimeSpying
() tq.TPush(5) tq.SafeTPush(6) fmt.Println("init:") tq.Print() time.Sleep(5 * time.Second) fmt.Println("after 5s:") tq.Print() time.Sleep(9 * time.Second) fmt.Println("after 14s") tq.Print() }

協程超時管理

// start to spy on queue's time-out data and throw it
func
(q *Queue) StartTimeSpying() { fmt.Println("time supervisor starts") go q.startTimeSpying() } // detail of StartTimeSpying function func (q *Queue) startTimeSpying() error { var err = make(chan string, 0) go func(queue *Queue, er chan string) { fmt.Println("start time spying, data in the queue can stay for " + q.ExpireAfter.String()) for { if queue.timeSpy == false { err <- "spying routine stops because: queue's timeSpy is false, make sure the queue is definition by q=TimeQueue(time.Duration,int)" return } select { case <-queue.flag: fmt.Println("time spy executing stops") return default: fmt.Print() } ok,er:=queue.timingRemove() if er!=nil{ err <- er.(errorx.Error).StackTrace() } if ok { time.Sleep(queue.timeStep) } } }(q, err) select { case msg := <-err: fmt.Println("time spy supervisor accidentally stops because: ",msg) return errorx.NewFromString(msg) case <-q.flag: fmt.Println("time spy supervisor stops") return nil } } // remove those time-out data func (q *Queue) timingRemove() (bool,error) { if len(q.Data) <1 { return true,nil } head, index, er := q.THead() if er != nil { return false, errorx.Wrap(er) } if index < 0 { return false, errorx.NewFromString("queue'length goes 0") } now := time.Now().Unix() created := time.Unix(head.CreatedAt, 0) //fmt.Println("now:",now) //fmt.Println("expire:",created.Add(q.ExpireAfter).Unix()) if created.Add(q.ExpireAfter).Unix() < now { // out of time _,_,e := q.TPop() if e!=nil { return false, errorx.Wrap(e) } if len(q.Data) >0 { return q.timingRemove() }else{ return true,nil } } else{ return true ,nil } }