新姿勢!Redis中呼叫Lua指令碼以實現原子性操作
背景:有一服務提供者Leader,有多個訊息訂閱者Workers。Leader是一個排隊程式,維護了一個使用者佇列,當某個資源空閒下來並被分配至佇列中的使用者時,Leader會向訂閱者推送訊息(訊息帶有唯一標識 ID ),訂閱者在接收到訊息後會進行特殊處理並再次推往前端。
問題:前端只需要接收到一條由Worker推送的訊息即可,但是如果Workers不做訊息重複推送判斷的話,會導致前端收到多條訊息推送,從而影響正常業務邏輯。
方案一(未通過)
在Worker接收到訊息時,嘗試先從redis快取中根據訊息的 ID 獲取值,有以下兩種情況:
-
如果值不存在,則表示當前這條訊息是第一次被推送,可以執行繼續執行推送程式,當然,不要忘了將當前訊息 ID 作為鍵插入快取中,並設定一個過期時間,標記這條訊息已經被推送過了。
-
如果值存在,則表示當前這條訊息是被推送過的,跳過推送程式。
程式碼可以這麼寫:
public void waitingForMsg() { // Message Received. String value = redisTemplate.opsForValue().get("msg_pushed_" + msgId); if (!StringUtils.hasText(value)) { // 當不能從快取中讀取到資料時,表示訊息是第一次被推送 // 趕緊往快取中插入一個標識,表示當前訊息已經被推送過了 redisTemplate.opsForValue().set("msg_pushed_" + msgId, "1"); // 再設定一個過期時間,防止資料無限制保留 redisTemplate.expire("msg_pushed_" + msgId, 20, TimeUnit.SECONDS); // 接下來就可以執行推送操作啦 this.pushMsgToFrontEnd(); } } 複製程式碼
看起來似乎是沒啥問題,但是我們從redis的角度分析一下請求,看看是不是真的沒問題。
> get msg_pushed_1# 此時Worker1嘗試獲取值 > get msg_pushed_1# Worker2也沒閒著,執行了這句話,並且時間找得剛剛好,就在Worker1準備插入值之前 > set msg_pushed_1 "1"# Worker1覺得訊息沒有被推送,插入了一個值 > set msg_pushed_1 "1"# Worker2也這麼覺得,做了同樣的一件事複製程式碼
你看,還是有可能會往前端推送多次訊息,所以這個方案不通過。
再仔細想一想,出現這個問題的原因是啥?———— 就是在執行get和set命令時,沒有保持 原子性 操作,導致其他命令有機可趁,那是不是可以把get和set命令當成一整個部分執行,不讓其他命令插入執行呢?
有很多方案可以實現,例如給鍵加鎖或者新增事務可能可以完成這個操作。但是我們今天討論一下另外一種方案,在Redis中執行Lua指令碼。
方案二
我們可以看一下Redis官方文件對Lua指令碼原子性的解釋。
Atomicity of scripts
Redis uses the same Lua interpreter to run all the commands. Also Redis guarantees that a script is executed in an atomic way: no other script or Redis command will be executed while a script is being executed. This semantic is similar to the one ofMULTI /EXEC. From the point of view of all the other clients the effects of a script are either still not visible or already completed.
大致意思是說:我們Redis採用相同的Lua直譯器去執行所有命令,我們可以保證,指令碼的執行是原子性的。作用就類似於加了MULTI/EXEC。
好,原子性有保證了,那麼我們再看看編寫語法。
> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second1) "key1"2) "key2"3) "first"4) "second"複製程式碼
由前至後的命令解釋(Arg 表示引數的意思 argument):
eval: Redis執行Lua指令碼的命令,後接指令碼內容及各引數。這個命令是從 2.6.0 版本才開始支援的。
1st. Arg : Lua指令碼,其中的KEYS[]和ARGV[]是傳入script的引數 。
2nd. Arg: 後面跟著的KEY個數n,從第三個引數開始的總共n個引數會被作為KEYS傳入script中,在script中可以通過KEYS[1], KEYS[2]…格式讀取,下標從1開始 。
Remain Arg: 剩餘的引數可以在指令碼中通過ARGV[1], ARGV[2]…格式讀取 ,下標從1開始 。
我們執行指令碼內容是 re
turn {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}
表示返回傳入的引數,所以我們可以看到引數被原封不動的返回了。
接著,我們再來實戰一下,在Lua指令碼中呼叫Redis方法吧。
我們可以在Lua指令碼中通過以下兩個命令呼叫redis的命令程式
-
redis.call()
-
redis.pcall()
兩者的作用是一樣的,但是程式出錯時的返回結果略有不同。
使用方法,命令和在Redis中執行一模一樣:
> eval "return redis.call('set', KEYS[1], ARGV[1])" 1 foo bar OK > eval "return redis.call('get', KEYS[1])" 1 foo "bar"複製程式碼
是不是很簡單,說了這麼多,我們趕緊來現學現賣,寫一個指令碼應用在我們的場景中吧。
> eval "if redis.call('get', KEYS[1]) == false then redis.call('set', KEYS[1], ARGV[1]) redis.call('expire', KEYS[1], ARGV[2]) return 0 else return 1 end" 1 msg_push_1 "1" 10複製程式碼
指令碼的意思和我們之前在 方案一 中寫的程式邏輯一樣,先判斷快取中是否存在鍵,如果不存在則存入鍵和其值,並且設定失效時間,最後返回0;如果存在則返回1。 PS: 如果對 if redis.call('get', KEYS[1]) == false
這裡為什麼得到的結果要與false比較的話,可以看最後的Tip。
-
執行第一次:我們發現返回值0,並且我們看到快取中插入了一條資料,鍵為
msg_push_1
、值為"1"
-
在失效前,執行多次:我們發現返回值一直為1。並且在執行第一次後的10秒,該鍵被自動刪除。
將以上邏輯遷入我們java程式碼後,就是下面這個樣子啦
public boolean isMessagePushed(String messageId) { Assert.hasText(messageId, "訊息ID不能為空"); // 使用lua指令碼檢測值是否存在 String script = "if redis.call('get', KEYS[1]) == false then redis.call('set', KEYS[1], ARGV[1]) redis.call('expire', KEYS[1], ARGV[2]) return 0 else return 1 end"; // 這裡使用Long型別,檢視原始碼可知指令碼返回值型別只支援Long, Boolean, List, or deserialized value type. DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(); redisScript.setScriptText(script); redisScript.setResultType(Long.class); // 設定key List<String> keyList = new ArrayList<>(); // key為訊息ID keyList.add(messageId); // 每個鍵的失效時間為20秒 Long result = redisTemplate.execute(redisScript, keyList, 1, 20); // 返回true: 已讀、false: 未讀 return result != null && result != 0L; } public void waitingForMsg() { // Message Received. if (!this.isMessagePushed(msgId)) { // 返回false表示未讀,接下來就可以執行推送操作啦 this.pushMsgToFrontEnd(); } }複製程式碼
Tip
這裡只是簡單的Redis中使用Lua指令碼介紹,詳細的使用方法可以參考官方文件,而且還有其他很多用法介紹。
對了,上面還有一個 坑 需要注意一下,就是關於Redis和Lua中變數的相互轉換,因為說起來囉哩囉嗦的,所以沒放在上文中,最後可以簡單說一下。
Re dis to Lua conversion table.
-
Redis integer reply -> Lua number
-
Redis bulk reply -> Lua string
-
Redis multi bulk reply -> Lua table (may have other Redis data types nested)
-
Redis status reply -> Lua table with a single ok field containing the status
-
Redis error reply -> Lua table with a single err field containing the error
-
Redis Nil bulk reply and Nil multi bulk reply -> Lua false boolean type // 這裡就是上面我們在指令碼中做是否為空判斷的時候
if redis.call('get', KEYS[1]) == false
,採用與false比較的原因。Redis的nil(類似null)會被轉換為Lua的false
Lua to Redis conversion table.
-
Lua number -> Redis integer reply (the number is converted into an integer)
-
Lua string -> Redis bulk reply
-
Lua table (array) -> Redis multi bulk reply (truncated to the first nil inside the Lua array if any)
-
Lua table with a single ok field -> Redis status reply
-
Lua table with a single err field -> Redis error reply
-
Lua boolean false -> Redis Nil bulk reply.
注意點:
Lua的Number型別會被轉為Redis的Integer型別,因此如果希望得到小數時,需要由Lua返回String型別的數字。