1. 程式人生 > >使用redis設計一個簡單的分布式鎖

使用redis設計一個簡單的分布式鎖

睡眠狀態 環境 是否 lease 驗證 pri 嘗試 lee 功能

最近看了有關redis的一些東西,了解了redis的一下命令,就記錄一下:

redis中的setnx命令:

關於redis的操作命令,我們一般會使用set,get等一系列操作,數據結構也有很多,這裏我們使用最簡單的string來存儲鎖。

redis下提供一個setnx命令用來將key值設為value,類似於set功能,但是它和set是有區別的,在於後面的nx,setnx是SET if Not eXists。就是:當且僅當key值不存在的時候,將該key值設置為value。

也就是說使用setnx加入元素的時候,當key值存在的時候,是沒有辦法加入內容的。

加鎖:

下面將使用python控制redis,python控制redis的方式和命令一樣,有一個setnx(key, value)方法,通過這個方法可以實現setnx命令的效果。

首先連接redis:

文件connect.py

1 import redis
2 
3 def connect_redis():
4     return redis.Redis(host=127.0.0.1, port=6379)

然後就可以使用setnx加鎖了,key對應的value中需要填入相應的值,這裏設置Value為一個uuid值。獲取到uuid之後,將key和value填入redis,其他的客戶端想要訪問並獲取到鎖,也使用setnx方式,key值只要相同就好。那麽代碼如下:

文件operate.py

1 # 加鎖的過程
2 def acquire_lock(conn, lockname):
3 identifier = str(uuid.uuid4()) 4 conn.setnx(lock: + lockname, identifier):

這樣就通過setnx將key值寫入了。但是,這樣顯然是不合理的,客戶端可以等待一會再次獲取鎖,這樣,不至於每次請求都會出現問題。那可以設置30秒的時間,讓程序在30秒內不停的嘗試獲取鎖,知道30秒的時間過了或者由其他客戶端釋放了鎖。

所以加鎖可以變為如下:

 1 # 加鎖的過程
 2 def acquire_lock(conn, lockname, args, acquite_timeout = 30):
 3     identifier = str(uuid.uuid4())
4 end = time.time() + acquite_timeout 5 while time.time() < end: 6 # 這裏嘗試取得鎖 setnx 設置-如果不存在的時候才會set 7 if conn.setnx(lock: + lockname, identifier): 8 # 獲得鎖之後輸出獲得鎖的‘進程’號 9 print(獲得鎖:進程+ str(args)) 10 return identifier 11 return False

這樣就可以通過setnx加鎖了,這個加鎖的過程實際上就是在redis中存入了一個值,之後當其他的客戶端再次想要通過這個key加入這個值的時候,發現這個key已經存在就不往進寫值了,但是在這30秒內還會不斷嘗試的去獲取鎖,也就是不斷的嘗試寫入這個值,一旦key被刪除——也就是鎖被釋放,則該客戶端就可以競爭獲取鎖——進程寫入這個值。這種方式就像是操作系統中的自旋鎖。

自旋鎖

這裏先簡單介紹一下自旋鎖。

和自旋鎖對應的還有一種鎖,叫做對於互斥鎖。

互斥鎖:如果資源已經被占用,資源申請者只能進入睡眠狀態。

自旋鎖:自旋鎖不會引起調用者睡眠,如果自旋鎖已經被別的執行單元保持,調用者就一直循環在那裏看是否該自旋鎖的保持者已經釋放了鎖,"自旋"一詞就是因此而得名。

釋放鎖:

既然鎖可以添加,那麽也就可以釋放。釋放鎖實際上就是將redis中的數據刪除。這裏可以使用redis提供的事務流水線去執行。為了保障在執行的時候確確實實釋放的所示沒有問題的。

代碼如下,調用delete()方法——del命令刪除redis中這個key的元素。

 1 # 釋放鎖
 2 def release_lock(conn, lockname, identifier):
 3     pipe = conn.pipeline(True)
 4     lockname = lock: + lockname
 5     while True:
 6         try:
 7             pipe.watch(lockname)
 8             identifier_real = pipe.get(lockname)
 9             if identifier_real == identifier:
10                 pipe.multi()
11                 pipe.delete(lockname)
12                 pipe.execute()
13                 return True;
14             pipe.unwatch()
15             break
16         except redis.exceptions.WatchError:
17             pass
18     return False

這裏就遇到了python3中的一個坑,獲取到的數據為byte類型,所以identifier_real這個變量實際上是這個字符串之前加了個b的,例如b‘xxxx‘,所以這和傳入的identifier的類型為string,這樣比較當然會出現Fasle,所以我們在這裏將這個字符串類型轉碼:

1 pipe.get(lockname).decode()

這樣才是整整的字符串類型的字符串,最終的代碼如下:

 1 def release_lock(conn, lockname, identifier):
 2     pipe = conn.pipeline(True)
 3     lockname = lock: + lockname
 4     while True:
 5         try:
 6             pipe.watch(lockname)
 7             identifier_real = pipe.get(lockname).decode()
 8             if identifier_real == identifier:
 9                 pipe.multi()
10                 pipe.delete(lockname)
11                 pipe.execute()
12                 return True;
13             pipe.unwatch()
14             break
15         except redis.exceptions.WatchError:
16             pass
17     return False

執行:

為了驗證這個分布式鎖的正確性,可以寫一個測試的方法進行測試,先去獲取鎖,如果獲取到之後,則sleep三秒,等待3秒之後,執行釋放鎖。

模擬過程入下:

文件operate.py

1 # 模擬加鎖解鎖的過程
2 def exec_test(conn, lockname, args):
3     id = acquire_lock(conn, lockname, args)
4     if id != False:
5         print(id)
6         time.sleep(3)
7         release_lock(conn, lockname, id)

這樣我們就可以使用多進程並發訪問的方式進行對鎖進行搶占和釋放:

使用9個進程進行測試,操作方式如下:

1 from connect import connect_redis
2 from operate import acquire_lock
3 from multiprocessing import Process
4 from operate import exec_test
5 
6 if __name__ == __main__:
7     redis = connect_redis()
8     for i in range(0, 9):
9         Process(target = exec_test, args = (redis, test, i)).start()

執行結果如下所示:

技術分享圖片

註:以上python運行環境為python3.6

使用redis設計一個簡單的分布式鎖