1. 程式人生 > >golang 實現一種環形佇列,及週期任務

golang 實現一種環形佇列,及週期任務

一、環形佇列

環形佇列不同語言有很多種不同的實現,不過大部分都比較複雜。

在使用golang實踐生產者消費者模型時,發現了一種變相的環形佇列,程式碼比“常規的”環形佇列簡單的多,解決2個問題:
1、生產者消費者間資料傳遞;
2、記憶體空間預申請,避免頻繁的動態記憶體申請釋放帶來記憶體碎片以及效能損耗。

package main

import (
	"fmt"
	"net"
	"time"
)

const (
	BUFF_MAX = 100
	BUFF_LEN = 1500
)

var (
	buff   [BUFF_MAX][BUFF_LEN]byte
	buffCh = make(chan int, BUFF_MAX-2) // chan 的長度要比buff少2,否則生產者會因為太快的寫入,把未被消費者處理的緩衝區覆蓋掉
)

func listenUdp(ip, port string) (*net.UDPConn, error) {
	addr, err := net.ResolveUDPAddr("udp4", ip+":"+"port")
	if err != nil {
		return nil, err
	}

	socket, err := net.ListenUDP("udp4", addr)
	if err != nil {
		return nil, err
	}
	return socket, nil
}

func connectUdp(ip, port string) (*net.UDPConn, error) {
	addr, err := net.ResolveUDPAddr("udp4", ip+":"+"port")
	if err != nil {
		return nil, err
	}

	socket, err := net.DialUDP("udp4", nil, addr)
	if err != nil {
		return nil, err
	}
	return socket, nil
}

func main() {
	go func() {
		sock, _ := listenUdp("localhost", "5000")
		defer sock.Close()

		index := 0
		for {
			n, _ := sock.Read(buff[index][:])
			if n > 0 {
				buffCh <- ((index << 16) | n)
				index++
				if index >= BUFF_MAX { // 模擬換行佇列,從頭開始複用buff
					index = 0
				}
			}
		}
	}()

	go func() {
		sock, _ := connectUdp("localhost", "5000")
		defer sock.Close()

		for {
			data := <-buffCh
			index, n := data>>16, data&0xffff
			fmt.Println(index, n) // 現在可以處理 buff[index] 中的資料了,有效的資料長度 n
			time.Sleep(time.Second) // 模擬處理每包資料需要耗費的時間
		}
	}()

}

程式碼說明:
1、第一個goroutine 生產者,第二個是消費者;
2、buffCh中傳遞了可以訪問的資料在buff中的index和長度,實際使用中用什麼chan以及什麼方式傳遞這些內容可以根據需求變,比如傳遞buff[index][:];
3、特別要注意chan的長度要比buff至少少2。否則在生產者填充資料的時候,可能當前被填充的緩衝區正在被消費者使用。

藉助於golang的chan的順序性,可以非常簡單的實現生產者和消費者傳遞資料,也避免了常規環形佇列複雜的指標和互斥操作。

###二、週期任務
以上面udp socket處理為例,有時需要消費者可以比較“平穩的”處理包資料,即在一個時間段內處理n包,不要太快。比如流視訊場景,平穩的將報文投遞到上層,可以讓視訊流更順暢,而且也可以讓接收端有機會對亂序、丟包做出一些處理。

一般的思路是:
1、啟動一個迴圈定時器;
2、超時後處理n包資料。

程式碼示例(對上面的消費者修改):

	go func() {
		const PKG_MAX = 10
		sock, _ := connectUdp("localhost", "5000")
		defer sock.Close()
		t := time.NewTimer(time.Millisecond * 1000)
		for {
			for i := 0; i < PKG_MAX; i++ {
				data := <-buffCh
				index, n := data>>16, data&0xffff
				fmt.Println(index, n) // 現在可以處理 buff[index] 中的資料了,有效的資料長度 n
				time.Sleep(time.Millisecond * 50) // 模擬處理每包資料需要耗費的時間
			}
			<-t.C
			t.Reset(time.Millisecond * 1000)
		}
	}()

上面程式碼的意圖是,每秒傳送10包資料,如果報文處理的快,那就等定時器超時;如果報文處理的慢,那定時器相當於失效了(每次執行到 <-t.C 時沒有阻塞)。

***需要注意是,定時器粒度越小越不準,波動越大。***上面的設定,如果改成每100ms發一包,會產生一些波動,而且效能也會下降不少。有興趣的可以自己測測。

再演變的話,這個模型需要三個goroutine就可以實現一個流量穩定的rudp:

  • routineA:負責接收udp報文,將報文通過chan1傳給routineB;
  • routineB:
    • 對亂序報文重排序、對丟包報文考慮重傳(自己實現);
    • 將整理好的報文通過chan2傳給routineC;
  • routineC:就像上面程式碼一樣,定時定量把報文投遞到應用層。

不得不感嘆go語言的強大,可以想想上面的功能如果用C/C++實現的話,會複雜多少!