Redis 應用層協議解析以及在 Python 客戶端中的實現
安禹
安禹,網易遊戲高階運維工程師,主要工作方向為網易遊戲 Redis Saas 的開發與運維,也關注 Python 和 Rust 的最新進展。怕什麼真理無窮,進一寸有進一寸的歡喜。
Redis 的應用層協議設計
Redis 的通訊協議設計得非常簡單,具體可以參考 Redis Protocol specification ,簡稱 RESP,在此進行一個大致的介紹。
RESP 本身並沒有專門的欄位標記整個請求的報文長度,它的設計思路整體針對於 命令管道(Pipeline)的需求,可以很方便地將多條命令封裝在一次 tcp 報文傳送中,比如:當我們傳送一個 ` GET A ` 命令,對應的報文如下:
*2\r\n$3\r\nGET\r\n$1\r\nA\r\n
而如果通過 Pipeline 傳送 ` GET A ` 和 `GET B` 兩條命令時,並不需要什麼額外處理,僅僅是將兩條命令按順序傳送:
*2\r\n$3\r\nGET\r\n$1\r\nA\r\n*2\r\n$3\r\nGET\r\n$1\r\nB\r\n
接下來我們進行一個較為詳細完整的解析
RESP 規定了五種資料型別 :
- 簡單字串(Simple Strings):以 ` + ` 作為開頭,一般用於簡單的字串回覆,比如 ` set A B ` 這 類命令的返回報文 中的 ` OK ` ,就是封裝在簡單字串型別中。
- 錯誤(Errors):以 ` - ` 作為開頭,用於返回錯誤資訊,比如輸入了一條不存在的命令,redis 服務會返回 ` ERR unknown command 'xx' ` ,這條 錯誤資訊就封裝在錯誤型別報文中。
- 整數(Integers) :以 ` : ` 開頭,用 於返回整數結果 ,比如 ` LLEN ` 命 令,當我們用它統計某個列表長度時,返回的數字就封裝在整數型別中。
- 二進位制安全字串(Bulk Strings): 以 ` $ ` 作 為開頭,用於承載攜帶資料,是最重要最常用的型別,當你向 Redis 傳送命令時,命令中的字串會被封裝在二進位制安全字串,比如開篇的例 子中 ` GET ` 就被封裝成了 ` $3\r\nGET\r\n ` 這樣一個二進位制安全字串報文,而一個正常 GET 命令的返回報文同樣是 一個二進位制安全字串。
- 陣列(Arrays):以 ` * ` 開頭,同樣是最重要最常用的型別,開篇的例子中 ` GET A ` 命令中的兩個字元 串 ` GET ` 和 ` A ` 分別被封裝成了 ` $3\r\nGET\r\n ` 和 ` $1\r\nA\r\n ` ,然後被進一步封裝成了一個數組型別 ` *2\r\n... ` ,我們對 Redis 所有傳送的命令都會被這樣封裝, 先是子字串被封裝成二進位制安全字串,然後二進位制安全字串被封裝成陣列發往服務端。
以下是更為詳細的示例:
簡單字串(Simple Strings)
簡單字串的回覆通常是固定的,可以類似的理解為靜態字串,這種通常表達一種確定的、可預期的結果,比如最常見的就 是 ` OK ` 和 事務中返回的 `QUEUED`
127.0.0.1:6379> set a b
OK
127.0.0.1:6379>
` OK ` 這 個字串不會有任何改變,也不需要攜帶可變的資訊,它僅僅是標識這個操作成功了,不會包含其他任何可變的資料。它 以 ` + ` 為開頭, 以 ` \r\n ` 為 結尾,比 如 ` OK ` 的 報文就 是 ` +OK\r\n `
錯誤(Errors)
錯誤與簡單字串非常相似,不同的是它以 ` - ` 作 為開頭,其他並沒有什麼不同,它僅僅是顯示一個錯誤資訊,而這個資訊在協議上並沒有什麼強制的規範,可以寫入任意字串資訊,當然錯誤字串中是不能寫 ` \r\n ` 的
比如命令不存在的報 錯 ` ERR unknown command 'tt' ` 封 裝結果就是
`-ERR unknown command 'tt'\r\n`
整數(Integers)
整數型別也很簡單,和前兩種不同的是,它是可以攜帶資料的,型別為有符號64位整數,用於一些返回整數型別的命令,目前文件顯示,會返回整數的有以下這些命令,
- SETNX
- DEL
- EXISTS
- INCR
- INCRBY
- DECR
- DECRBY
- DBSIZE
- LASTSAVE
- RENAMENX
- MOVE
- LLEN
- SADD
- SREM
- SISMEMBER
- SCARD
當然 Redis 的官方文件一直都不是很靠譜,RESP很久沒更新了,目前來看至少用於 Stream 功能的 XLEN 命令和用於 HyperLog 功能的 PFCOUNT命令也是要返回整數型別。
這種資料的封裝也很簡單,使 用 `:` 作為開頭, `\r\n` 作為結尾,中間為要填充的數字,整數型別可以用來標識布林型別,比如在 EXISTS 命令中, `:1\r\n` 表示 true, `:0\r\n` 表示 fals e
因為整數型別使用64位有符號整數,所以也可以表示負數,比如對一個不存在的 key 使用 TTL 命令時,會返回 -2
10.200.27.30:6379> EXISTS AAA
(integer) 0 # key AAA 不存在
10.200.27.30:6379> TTL AAA
(integer) -2 # 對 AAA 使用 TTL 命令返回 -2
二進位制安全字串(Bulk Strings)
二進位制安全字串使用如下方法進行編碼:
1 . 用 `$` 字元作為開頭,後接實際字串的位元組數,再新增 `\r\n` 來表示資料長度。
2. 要傳送的實際字串資料。
3. 再新增一個 `\r\n` 作為結尾
4. 單個二進位制安全字串資料位元組數不超過 512MB
舉例,要封裝 `Hello,world` 字串,字串位元組數為 11,所以使用 `$11\r\n` 作為開頭,封裝結果如下:
$11\r\nHello,world\r\n
空字串可以使用如下表示:
$0\r\n\r\n
二進位制安全字串還可用來表示 NULL
$-1\r\n
比如當我們嘗試 GET 一個不存在的 key 時, 就會返回 `$-1\r\n` 以下我們使用原 生 socket 和 Redis 服務端互動:
$ python
Python 2.7.9 (default, Mar 1 2015, 12:57:24)
[GCC 4.9.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import socket
>>> res = socket.getaddrinfo("127.0.0.1", 6379, 0, socket.SOCK_STREAM)
>>> res
[(2, 1, 6, '', ('127.0.0.1', 6379))]
>>> family, socktype, proto, canonname, socket_address = res[0]
>>> sock = socket.socket(family, socktype, proto)
>>> sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
>>> sock.connect(socket_address)
>>> # 傳送 EXISTS AAA 命令
>>> sock.sendall(b"*2\r\n$6\r\nEXISTS\r\n$3\r\nAAA\r\n")
>>> sock.recv(512)
':0\r\n'
>>> # 返回整數型別 0,表示 false,即 key AAA 不存在
>>> # 傳送 GET AAA 命令
>>> sock.sendall(b"*2\r\n$3\r\nGET\r\n$3\r\nAAA\r\n")
>>> sock.recv(512)
'$-1\r\n'
>>> # 返回一個長度為 -1 的二進位制安全字串,表示 key AAA 對應的 value 不存在
>>> # 對不存在的 key 使用 TTL
>>> sock.sendall(b"*2\r\n$3\r\nTTL\r\n$3\r\nAAA\r\n")
>>> sock.recv(512)
':-2\r\n'
>>> # 返回整數 -2
陣列(Arrays)
陣列型別用於傳送 Redis 命令,也同樣用於一些命令的返回,它使用 `*` 作為開頭,後接資料元素個數,再接 `\r\n` ,之 後即可放入對應元素,元素可以是任意型別,當然也可以是一個數組,陣列可以包含另一個數組。
比如包含兩個整形 1 的陣列
*2\r\n:1\r\n:1\r\n
當需要一個 NULL 陣列時,處理方式與二進位制安全字串類似
*-1\r\n
比如 BLPOP 命令超時後,redis 服務端就會返回一個 NULL 陣列,即 `*-1\r\n`
協議上對 Pipeline 的實現
如文章開頭所說,RESP 設計思路一開始就充分考慮了 Pipeline 的需求,這是因為記憶體速度遠高於網路 IO,還能大大降低 IO 的讀寫次數,使用 Pipeline 是挖掘 Redis 效能最具有價效比的方法。協議上對於 Pipeline 的實現也非常直接了當。
實現的方式就是將多條報文直接連在一起傳送,沒有其他任何額外資訊傳送。
還是用原生 socket 傳送報文來舉例:
>>> CMD_1 = b"*2\r\n$3\r\nGET\r\n$3\r\nAAA\r\n" # 命令1 GET AAA
>>> CMD_2 = b"*2\r\n$3\r\nGET\r\n$3\r\nAAB\r\n" # 命令2 GET AAB
>>> sock.sendall(CMD_1 + CMD_2) # 直接將兩條報文一起傳送
>>> sock.recv(512)
'$-1\r\n$-1\r\n' # 可見兩條命令的回覆也被一起發回
協議上對 Pipeline 的實現就是這麼簡單。
redis-py 對於協議的實現
redis-py 對於協議解析的實現
redis-py是目前使用最多的 Python 語言下的 Redis 客戶端工具庫,它對於 Redis 協議解析在 redis/connection.py檔案的 HiredisParser 和 PythonParser 物件(3.0.1 版本)中實現。
程式會根據當前包的安裝情況,如果發現安裝了 0.1.3 版本以上的 hiredis-py,就會 import hiredis 進行網路 IO 讀取和報文解析。
hiredis-py是 Redis 官方提供的 Python 語言 Redis 客戶端驅動,底層使用 C 編寫,理論上擁有更好的效能,但是也要注意,如果使用 Pypy 的話,可能會出現對於 hiredis-py的相容性問題。
我們在此主要看使用 Python 編寫的 PythonParser 物件的實現,主要程式碼在 PythonParser 的 read_response 方法,程式碼很短,為了方便展示,剔除了一些型別檢查和錯誤處理的程式碼:
def read_response(self):
# 從 socket buffer 物件中讀取服務端回覆,讀到 \r\n 為止
response = self._buffer.readline()
# 如果讀取的內容為 空位元組串 則說明連線已經斷開
if not response:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
# 取開頭的一位元組,確定報文型別
byte, response = byte_to_chr(response[0]), response[1:]
# 如果報文型別未知報協議錯誤
if byte not in ('-', '+', ':', '$', '*'):
raise InvalidResponse("Protocol Error: %s, %s" % (str(byte), str(response)))
# 處理錯誤型別報文
if byte == '-':
response = nativestr(response)
# 處理一些常見錯誤報文,注意這只是處理一些約定俗成的錯誤內容,和協議規範並無關係
# 具體可參見 BaseParser 物件
error = self.parse_error(response)
return error
# 處理簡單字串
elif byte == '+':
pass
# 處理整形型別
elif byte == ':':
# 將回復轉成 64 位長整形
response = long(response)
# 處理二進位制安全字串
elif byte == '$':
# 獲取二進位制字串長度
length = int(response)
# 如果長度為 -1,說明是個 NULL,直接返回 None
if length == -1:
return None
# 讀取對應長度的報文
response = self._buffer.read(length)
# 處理陣列
elif byte == '*':
length = int(response)
# 處理空陣列的情況
if length == -1:
return None
# 迴圈遞迴獲取陣列中元素,如果是 Python3,這裡的 xrange 實際上被定位到了 range 函式 response = [self.read_response() for i in xrange(length)]
# 將回復報文轉碼為 str
if isinstance(response, bytes):
response = self.encoder.decode(response)
return response
redis-py 對於協議報文構造的實現
redis-py對協議報文的構造完全由 Python 編寫,主要程式碼在 Connection 物件 的 pack_command 方法,程式碼同樣不長,因為只需要處理二進位制安全字串和陣列兩種,程式碼更簡單
def pack_command(self, *args):
output = []
# 這一部份主要是為了相容類似於 `config get XXXX` 的命令
# 因為實現中將 `config get` 作為一個命令,但是在封裝成二進位制安全字串時
# 依然要作為兩個字串,所以在此進行分割,Token 物件只是一個快取
command = args[0]
if ' ' in command:
args = tuple(Token.get_token(s)
for s in command.split()) + args[1:]
else:
args = (Token.get_token(command),) + args[1:]
# SYM_STAR = b'*', SYM_DOLLAR = b'$', SYM_CRLF = b'\r\n', SYM_EMPTY = b''
# 構造陣列頭部,比如 `*2\r\n` buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF))
# 程式碼對這個值寫死為 6000 buffer_cutoff = self._buffer_cutoff
# 對各引數進行編碼,編碼細節可見 Encoder 物件 for arg in imap(self.encoder.encode, args):
# 為避免單個命令總的位元組數過長,導致生成一個極長的字串,
# 當 buff 位元組數或 arg 總數超過 6000 時,將其分塊在列表的多個字串中
# 不過還是有些侷限,比如嘗試 GET 一個名稱極長的 key 時,最終的結果
# 不會把 key 名分割
if len(buff) > buffer_cutoff or len(arg) > buffer_cutoff:
buff = SYM_EMPTY.join(
(buff, SYM_DOLLAR, str(len(arg)).encode(), SYM_CRLF))
output.append(buff)
output.append(arg)
buff = SYM_CRLF
else:
buff = SYM_EMPTY.join(
(buff, SYM_DOLLAR, str(len(arg)).encode(), SYM_CRLF, arg, SYM_CRLF))
output.append(buff)
return output
使用 pack_command 構造報文:
$ python Python 2.7.9 (default, Mar 1 2015, 12:57:24) [GCC 4.9.2] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> from redis.connection import Connection >>> conn = Connection() >>> conn.pack_command("GET", "AAA")['*2\r\n$3\r\nGET\r\n$3\r\nAAA\r\n']
redis-py-cluster 與 Pipeline
redis-py-cluster 對於 Redis Cluster 的 Pipeline 實現
和單分片的 Redis 服務相比,redis-cluster 有著不少的侷限,主要體現在跨分片資料計算,比如 SDIFF、BRPOPLPUSH 這種需要以多個 key 為引數的命令在使用上限制很大,還有就是在事務和 Pipeline 上的差別。
在 Redis 中,事務和 Pipeline 是充分解耦合的,但很多實現確實會把兩者結合使用,比如 redis-py中,Pipeline 就預設開啟了事務(詳見 client.pyRedis 物件的 pipeline 方法)。
但在 redis-py-cluster (3.0.1 版本)的 Pipeline 實現中,完全停止了對事務的支援,甚至嘗試通過呼叫 StrictClusterPipeline 物件例項的 multi 方法時,會直接丟擲 ` RedisClusterException("method multi() is not implemented") ` 。
在此我們可以解析 redis-py-cluster 對於 Pipeline 的實現方法,來檢視它的底層原理。主要實現在 [ rediscluster/pipeline.py 的 StrictClusterPipeline 物件 send _cluster_ commands 方法,感謝這個庫的作者 Grokzen ,這個函式的註釋比程式碼行數還多。
def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True):
# StrictClusterPipeline 物件將 pipeline 中的命令集合存在一個列表中,
# 作為 stack 引數傳入
# 其實這一行我沒太看懂,看起來是進行了排序,但是 position 引數預設是 None
# 也沒有對這個引數進行修改的程式碼,此處存疑
attempt = sorted(stack, key=lambda x: x.position)
# 用於將命令分類,key 為節點名,value 為要執行的命令列表
nodes = {}
# 對命令進行分類,逐個判斷對應節點,並存儲至 nodes
for c in attempt:
# 獲取引數所在的 slot,對於含有多個 key 的命令,取第一個 key
slot = self._determine_slot(*c.args)
# 獲取 slot 對應的節點
node = self.connection_pool.get_node_by_slot(slot)
# 此處給 node 物件加一個 name 引數,引數值為 “節點ip:port”,其實應該在node物件 # 建立時就應該有這個引數了,原註釋也說這是一個“小小的 hack” self.connection_pool.nodes.set_node_name(node)
# 這個 node_name 就是上一行執行的結果,否則不會有 `name` 這個 key
node_name = node['name']
# 此處將命令逐個append 到 nodes 中對應節點的 value 中
# 並對每個會涉及的節點建立一個連線
if node_name not in nodes:
nodes[node_name]=NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node))
nodes[node_name].append(c)
# 取出命令,逐個節點發送所有命令
node_commands = nodes.values()
for n in node_commands:
n.write()
# 逐個節點等待命令返回,其實這一部分有很大優化空間
# 理論上使用 select 可以提升不少效能,當節點很多時,
# 這樣收發的效率其實很低
for n in node_commands:
n.read()
# 釋放連線
for n in nodes.values():
self.connection_pool.release(n.connection)
# 當出現錯誤進行重試
attempt = sorted([c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position)
if attempt and allow_redirections:
# 原註釋中,作者認為出現了錯誤需要重試時,應該將正確性提升為最優先要求,
#為了重試可以犧牲一些效能
self.connection_pool.nodes.increment_reinitialize_counter(len(attempt))
for c in attempt:
try:
# 逐個命令逐個節點進行收發,不再一口氣傳送接收所有命令和回覆
c.result = super(StrictClusterPipeline, self).execute_command(*c.args, **c.options)
except RedisError as e:
c.result = e
# 其實這個 sorted 好像依然沒什麼必要= =
# 將結果依照命令順序排序放進結果列表
response = [c.result for c in sorted(stack, key=lambda x: x.position)]
if raise_on_error:
# 如果重試後依然有錯誤,將第一個錯誤轉碼丟擲
self.raise_first_error(stack)
return response
redis-py-cluster 在 Pipeline 上的侷限性
從實現來看,redis-py-cluster 在 Pipeline 的實現上徹底拋棄了對事務的直接支援,當然如果一定要用事務的話,比如說可以確定事務中操作的 key 都在一個 slot 中(對同個 key 多次操作或者使用自定義 tag),還是可以直接使 用 ` execute_command("MULTI") ` 和 ` execute_command("EXEC") ` 來通過命令進行對於單個 slot 的事務操作。
目前來看最主要的侷限性還是在於操作多個分片時的資料安全問題,從實現上來看,當使用 Pipeline 進行的操作涉及多個節點的話,有可能在某些節點成功但是在某些節點失敗,對於資料的安全性可能是一個很大的隱患,當出現了這種情況,命令的再次重試也是一個比較麻煩的問題,redis-py-cluster 實現的重試可能並不能滿足所有需求。
次要的問題就是效能上並不能發揮出 Redis Cluster 最大效能,因為實現的收發邏輯比較簡陋,再加上出錯時比較低效的重試方式,Pipeline 中命令涉及的節點越多,Pipeline 對效能的提升就有可能越不明顯。這些都是在使用 redis-py-cluster 時應該注意的問題。
RESP3 :redis 團隊畫的大餅
如前文所示,RESP2 非常的簡單,有些地方甚至是有點簡陋甚至混亂,用 RESP3 文件中原話來說
RESP3 abandons the confusing wording of the second version of RESP
比如用 ` $-1\r\n ` 表示 NULL,用 ` *-1\r\n ` 表 示空列表,錯誤型別直接是一個字串,協議上對錯誤格式沒有詳細規範這些等等,都在 RESP3 的文件中也有提及,並且作者還列出了更多的缺點。
在此我們沒有必要花費太多時間去了解 RESP3 的具體規範,畢竟這份協議還沒有一個初具規模的具體實現,但是我們可以通過這份規範來嘗試推測一下 Redis 團隊將來可能會推出的新的功能。
可能會出現更好用的統計集合相關的命令
RESP3 中添加了浮點型和大數(超過64位)的支援, 形似 `,1.23\r\n` ,我推測可能會出現一些比較好用的數字統計功能,比如對一個 LIST 中的數字求平均數或方差標準差,對一個 Sorted SET 中的資料求平均權重,甚至可能支援浮點型的權重值,浮點型權重值是一個相當棒的功能。
而且 RESP3 中還添加了對空值的實現,形如 `_\r\n` ,那有沒有可能可以在 Sorted SET 支援元素的權重值為空?比如對於一個新的元素,它的權重值未知,權重值置為 0 又會汙染計算結果,那現在有了標準的空值,是否可以對新元素的權重置空呢?從協議上來說這並不是不可能實現。
可能會出現更為規範的錯誤型別返回
RESP3 提供了新的錯誤型別 Blob error,和 RESP2 的錯誤型別相比,它提供了二進位制安全的錯誤資訊,和 Bulk String 非常 像,形如 ` !21\r\nSYNTAX invalid syntax\r\n ` ,這對於 Redis 驅動庫是一個非常好的訊息,這意味這錯誤資訊更為詳細,更為可讀,更重要的是更為規範。
讓我們看下 redis-py 中對於 RESP2 的錯誤歸類:
EXCEPTION_CLASSES = { 'ERR': { 'max number of clients reached': ConnectionError }, 'EXECABORT': ExecAbortError, 'LOADING': BusyLoadingError, 'NOSCRIPT': NoScriptError, 'READONLY': ReadOnlyError, }
可以說是相當簡陋了。
相比之下,RESP3 的二進位制安全錯誤型別非常值得期待,這對於將來各種 Redis 庫甚至於 Redis Cluster 都有重大意義,Redis Cluster 的實現中大量使用 MOVED 錯誤,新的錯誤型別或許意味著更加強大的重定向功能,這也許能催生出更加可用的 Redis 中介軟體。
Redis 可能會成為更泛用的快取伺服器
在文件的 TODO 中,Redis 團隊提出了 一個 `Document streaming of big strings` , 這意味著 Redis 將來可能會讓字串型別突破 512MB 的限制,這也使得 Redis 能適用於更多的場景,比如大檔案快取伺服器,這對於 CDN 之類的服務可能有很大的作用。
Redis 目前的協議規範和實現都有自己的亮點,也有這樣那樣的缺點,通訊協議作為一個服務基礎中的基礎,必然還會不斷的演變,就算將來 Redis 過時了,RESP 作為一個協議依然可能會被繼續廣泛使用。
本文只是一個大概的描述,很多細節還需要從原始碼中細摳,第四部份則是筆者不負責任的開腦洞,博君一笑爾。
關注我們,獲一手遊戲運維方案