使用 golang 寫一個 redis-cli
redis的客戶端(redis-cli)和服務端(redis-server)的通訊是建立在tcp連線之上, 兩者之間資料傳輸的編碼解碼方式就是所謂的redis通訊協議。所以,只要我們的redis-cli實現了這個協議的解析和編碼,那麼我們就可以完成所有的redis操作。
redis 協議設計的非常易讀,也易於實現,關於具體的redis通訊協議請參考:通訊協議(protocol)。後面我們在實現這個協議的過程中也會簡單重複介紹一下具體實現
1. 建立tcp連線
redis客戶端和服務端的通訊是建立tcp連線之上,所以第一步自然是先建立連線
package main import ( "flag" "log" "net" ) var host string var port string func init() { flag.StringVar(&host, "h", "localhost", "hsot") flag.StringVar(&port, "p", "6379", "port") } func main() { flag.Parse() tcpAddr := &net.TCPAddr{IP: net.ParseIP(host), Port: port} conn, err := net.DialTCP("tcp", nil, tcpAddr) if err != nil { log.Println(err) } defer conn.Close() // to be continue } 複製程式碼
後續我們傳送和接受資料便都可以使用conn.Read()
和conn.Write()
來進行了
2. 傳送請求
傳送請求第一個第一個位元組是"*",中間是包含命令本身的引數個數,後面跟著"\r\n" 。之後使用"$"加引數位元組數量並使用"\r\n"結尾,然後緊跟引數內容同時也使用"\r\n"結尾。如執行SET key liangwt
客戶端傳送的請求為"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$7\r\nliangwt\r\n"
注意:
- 命令本身也作為協議的其中一個引數來發送
- \r\n 對應byte的十進位制為 13 10
我們可以使用telnet測試下
wentao@bj:~/github.com/liangwt/redis-cli$ telnet 127.0.0.1 6379 Trying 127.0.0.1... Connected to 127.0.0.1. Escape character is '^]'. *3 $3 SET $3 key $7 liangwt +OK 複製程式碼
先暫時忽略服務端的回覆,通過telnet我們可以看出請求協議非常簡單,所以對於請求協議的實現不做過多的介紹了,直接放程式碼(如下使用基於字串拼接,只是為了更直觀的演示,效率並不高,實際程式碼中我們使用bytes.Buffer來實現)
func MultiBulkMarshal(args ...string) string { var s string s = "*" s += strconv.Itoa(len(args)) s += "\r\n" // 命令所有引數 for _, v := range args { s += "$" s += strconv.Itoa(len(v)) s += "\r\n" s += v s += "\r\n" } return s } 複製程式碼
在實現了對命令和引數進行編碼之後,我們便可以通過conn.Write()
把資料推送到服務端
func main() { // .... req := MultiBulkMarshal("SET", "key", "liangwt") _, err = conn.Write([]byte(req)) if err != nil { log.Fatal(err) } // to be continue } 複製程式碼
3. 獲取回覆
我們首先實現通過tcp獲取服務端返回值,就是上面提到過的conn.Read()
。
func main() { // .... p := make([]byte, 1024) _, err = conn.Read(p) if err != nil { log.Fatal(err) } // to be continue } 複製程式碼
4. 解析回覆
我們拿到p之後我們就可以解析返回值了,redis服務端的回覆是分為幾種情況的
- 狀態回覆
- 錯誤回覆
- 整數回覆
- 批量回復
- 多條批量回復
我們把前四種單獨看作一組,因為他們都是單一型別的返回值
我們把最後的多條批量回復看成單獨的一組,因為它是包含前面幾種型別的混合型別。而且你可以發現它和我們的請求協議是一樣的
也正是基於以上的考慮我們建立兩個函式來分別解析單一型別和混合型別,這樣在解析混合型別中的某一型別時就只需要呼叫單一型別解析的函式即可
在解析具體協議前我們先實現一個是讀取到\r\n為止的函式
func ReadLine(p []byte) ([]byte, error) { for i := 0; i < len(p); i++ { if p[i] == '\r' { if p[i+1] != '\n' { return []byte{}, errors.New("format error") } return p[0:i], nil } } return []byte{}, errors.New("format error") } 複製程式碼
第一種狀態回覆:
狀態回覆是一段以 "+" 開始, "\r\n" 結尾的單行字串。如 SET 命令成功的返回值:"+OK\r\n"
所以我們判斷第一個字元是否等於 '+' 如果相等,則讀取到\r\n
func SingleUnMarshal(p []byte) ([]byte, int, error) { var ( result []byte errerror length int ) switch p[0] { case '+': result, err = ReadLine(p[1:]) length = len(result) + 3 } return result, length, err } 複製程式碼
注:我們在返回實際回覆內容的同時也返回了整個回覆的長度,方便後面解析多條批量回復時定位下一次的解析位置
第二種錯誤回覆:
錯誤回覆的第一個位元組是 "-", "\r\n" 結尾的單行字串。如執行SET key
缺少引數時返回值:"-ERR wrong number of arguments for 'set' command\r\n"
錯誤回覆和狀態回覆非常相似,解析方式也是一樣到。所以我們只需新增一個case即可
func SingleUnMarshal(p []byte) ([]byte, int, error) { var ( result []byte errerror length int ) switch p[0] { case '+', '-': result, err = ReadLine(p[1:]) length = len(result) + 3 } return result, length, err } 複製程式碼
第三種整數回覆:
整數回覆的第一個位元組是":",中間是字串表示的整數,"\r\n" 結尾的單行字串。如執行LLEN mylist
命令時返回 ":10\r\n"
整數回覆也和上面兩種是一樣的,只不過返回的是字串表示的十進位制整數
func SingleUnMarshal(p []byte) ([]byte, int, error) { var ( result []byte errerror length int ) switch p[0] { case '+', '-', ':': result, err = ReadLine(p[1:]) length = len(result) + 3 } return result, length, err } 複製程式碼
第四種批量回復:
批量回復的第一個位元組為 "$",接下來是字串表示的整數,它表示實際回覆的長度,之後跟著一個 "\r\n",再後面跟著的是實際回覆資料,最末尾是另一個 "\r\n"。如GET key
命令的返回值:"$7\r\nliangwt\r\n"
所以批量回復解析的實現:
- 讀取第一行得到實際回覆的長度
- 把字串型別的長度轉換成對應十進位制整數
- 從第二行開始位置往下讀對應長度
但是對於某些不存在的key,批量回復會將特殊值 -1 用作回覆的長度值, 此時我們不需要繼續往下讀取實際回覆。例如GET NOT_EXIST_KEY
返回值:"$-1", 所以我們需要對此特殊情況判斷,讓函式返回一個空物件(nil)而不是空值("")
func SingleUnMarshal(p []byte) ([]byte, int, error) { // .... case '$': n, err := ReadLine(p[1:]) if err != nil { return []byte{}, 0, err } l, err := strconv.Atoi(string(n)) if err != nil { return []byte{}, 0, err } if l == -1 { return nil, 0, nil } // +3 的原因 $ \r \n 三個字元 result = p[len(n)+3 : len(n)+3+l] length = len(n) + 5 + l } return result, length, err } 複製程式碼
思考:
為什麼redis要使用提前告知位元組數,然後往下讀取指定長度的方式,而不是直接讀取第二行到\r\n為止?
答案很明顯:此方式可以讓redis讀取返回值時不受具體的返回內容影響,在按行讀取的情況下,無論使用任何分割符都有可能導致redis在解析具體內容時把內容中的分割符當作時結尾,導致解析錯誤。
思考一下這種情況:我們SET key "liang\r\nwt"
,那麼當我們GET key
時,服務端返回值為"$9\r\nliang\r\nwt\r\n" 完全規避了value中的\r\n影響
第五種多條批量回復:
多條批量回復是由多個回覆組成的陣列,它的第一個位元組為"*", 後跟一個字串表示的整數值, 這個值記錄了多條批量回復所包含的回覆數量, 再後面是一個"\r\n"。如LRANGE mylist 0 -1
的返回值:"*3\r\n$1\r\n3\r\n$1\r\n2\r\n$1\r\n1"。
所以多條批量回復解析的實現:
- 解析第一行資料獲得字串型別的回覆數量
- 把字串型別的長度轉換成對應十進位制整數
- 按照單條回覆依次逐個解析,一共解析成上面得到的數量
在這裡我們用到了單條解析時返回的位元組長度length,通過這個長度我們可以很方便的知道下次單條解析的開始位置為上一次位置+length
在解析多條批量回復時需要注意兩點:
第一,多條批量回復也可以是空白的(empty)。例如執行LRANGE NOT_EXIST_KEY 0 -1
服務端返回值"*0\r\n"。此時客戶端返回的應該空陣列[][]byte
第二,多條批量回復也可以是無內容的(null multi bulk reply)。例如執行BLPOP key 1
服務端返回值"*-1\r\n"。此時客戶端返回的應該是nil
func MultiUnMarsh(p []byte) ([][]byte, error) { if p[0] != '*' { return [][]byte{}, errors.New("format error") } n, err := ReadLine(p[1:]) if err != nil { return [][]byte{}, err } l, err := strconv.Atoi(string(n)) if err != nil { return [][]byte{}, err } // 多條批量回復也可以是空白的(empty) if l == 0 { return [][]byte{}, nil } // 無內容的多條批量回復(null multi bulk reply)也是存在的, // 客戶端庫應該返回一個 null 物件, 而不是一個空陣列。 if l == -1 { return nil, nil } result := make([][]byte, l) t := len(n) + 3 for i := 0; i < l; i++ { ret, length, err := SingleUnMarshal(p[t:]) if err != nil { return [][]byte{}, errors.New("format error") } result[i] = ret t += length } return result, nil } 複製程式碼
5. 命令列模式
一個可用的redis-cli自然是一個互動式的,使用者輸入指令然後輸出返回值。在go中我們可以使用以下程式碼即可獲得一個類似的互動式命令列
func main() { // .... for { fmt.Printf("%s:%d>", host, port) bio := bufio.NewReader(os.Stdin) input, _, err := bio.ReadLine() if err != nil { log.Fatal(err) } fmt.Printf("%s\n", input) } } 複製程式碼
我們執行以上程式碼就可以實現
localhost:6379>set key liang set key liang localhost:6379>get key get key localhost:6379> 複製程式碼
結合上我們的redis傳送請求和解析請求即可完成整個redis-cli
func main() { // .... for { fmt.Printf("%s:%d>", host, port) // 獲取輸入命令和引數 bio := bufio.NewReader(os.Stdin) input, err := bio.ReadString('\n') if err != nil { log.Fatal(err) } fields := strings.Fields(input) // 編碼傳送請求 req := MultiBulkMarshal(fields...) // 傳送請求 _, err = conn.Write([]byte(req)) if err != nil { log.Fatal(err) } // 讀取返回內容 p := make([]byte, 1024) _, err = conn.Read(p) if err != nil { log.Fatal(err) } // 解析返回內容 if p[0] == '*' { result, err := MultiUnMarsh(p) } else { result, _, err := SingleUnMarshal(p) } } // .... } 複製程式碼