程序(三):程序同步——Lock(鎖)、Semaphore(訊號量)、Event(事件)
目錄
訊號量 —— multiprocess.Semaphore(瞭解)
鎖 —— multiprocess.Lock
當多個程序使用同一份資料資源的時候,就會引發資料安全或順序混亂問題。
(1)多程序搶佔輸出資源 import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' %(n,os.getpid())) time.sleep(random.random()) print('%s:%s is done' %(n,os.getpid())) if __name__ == '__main__': for i in range(3): p=Process(target=work,args=(i,)) p.start() (2)使用鎖維護執行順序 # 由併發變成了序列,犧牲了執行效率,但避免了競爭 import os import time import random from multiprocessing import Process,Lock def work(lock,n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,i)) p.start()
上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程式又重新變成串行了,這樣確實會浪費了時間,卻保證了資料的安全。
接下來,我們以模擬搶票為例,來看看資料安全的重要性。
(1)多程序同時搶購餘票 #檔案db的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 #併發執行,效率高,但競爭寫同一檔案,資料寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩餘票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(0.1) #模擬讀資料的網路延遲 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫資料的網路延遲 json.dump(dic,open('db','w')) print('\033[43m購票成功\033[0m') def task(): search() get() if __name__ == '__main__': for i in range(100): #模擬併發100個客戶端搶票 p=Process(target=task) p.start() (2)使用鎖來保證資料安全 #檔案db的內容為:{"count":5} #注意一定要用雙引號,不然json無法識別 #併發執行,效率高,但競爭寫同一檔案,資料寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩餘票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(random.random()) #模擬讀資料的網路延遲 if dic['count'] >0: dic['count']-=1 time.sleep(random.random()) #模擬寫資料的網路延遲 json.dump(dic,open('db','w')) print('\033[32m購票成功\033[0m') else: print('\033[31m購票失敗\033[0m') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(100): #模擬併發100個客戶端搶票 p=Process(target=task,args=(lock,)) p.start()
總結:
加鎖可以保證多個程序修改同一塊資料時,同一時間只能有一個任務可以進行修改,即序列的修改,沒錯,速度是慢了,但犧牲了速度卻保證了資料安全。
雖然可以用檔案共享資料實現程序間通訊,但問題是:
1.效率低
2.需要自己加鎖處理
為此mutiprocessing模組為我們提供了基於訊息的IPC通訊機制:佇列和管道。
1 佇列和管道都是將資料存放於記憶體中
2 佇列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,
我們應該儘量避免使用共享資料,儘可能使用訊息傳遞和佇列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。
訊號量 —— multiprocess.Semaphore(瞭解)
訊號量介紹Semaphore
●互斥鎖同時只允許一個執行緒更改資料,而訊號量Semaphore是同時允許一定數量的執行緒更改資料 。
●假設商場裡有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。
●實現: 訊號量同步基於內部計數器,每呼叫一次acquire(),計數器減1;每呼叫一次release(),計數器加1.當計數器為0時,acquire()呼叫被阻塞。這是迪科斯徹(Dijkstra)訊號量概念P()和V()的Python實現。訊號量同步機制適用於訪問像伺服器這樣的有限資源。
●訊號量與程序池的概念很像,但是要區分開,訊號量涉及到加鎖的概念
#例子
from multiprocessing import Process,Semaphore
import time,random
def go_ktv(sem,user):
sem.acquire()
print('%s 佔到一間ktv小屋' %user)
time.sleep(random.randint(0,3)) #模擬每個人在ktv中待的時間不同
sem.release()
if __name__ == '__main__':
sem=Semaphore(4)
p_l=[]
for i in range(13):
p=Process(target=go_ktv,args=(sem,'user%s' %i,))
p.start()
p_l.append(p)
for i in p_l:
i.join()
print('============》')
事件 —— multiprocess.Event(瞭解)
事件介紹
●python執行緒的事件用於主執行緒控制其他執行緒的執行,事件主要提供了三個方法 set、wait、clear。
●事件處理的機制:全域性定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。
●clear:將“Flag”設定為False
●set:將“Flag”設定為True
#紅綠燈例項
from multiprocessing import Process, Event
import time, random
def car(e, n):
while True:
if not e.is_set(): # 程序剛開啟,is_set()的值是Flase,模擬訊號燈為紅色
print('\033[31m紅燈亮\033[0m,car%s等著' % n)
e.wait() # 阻塞,等待is_set()的值變成True,模擬訊號燈為綠色
print('\033[32m車%s 看見綠燈亮了\033[0m' % n)
time.sleep(random.randint(3, 6))
if not e.is_set(): #如果is_set()的值是Flase,也就是紅燈,仍然回到while語句開始
continue
print('車開遠了,car', n)
break
def police_car(e, n):
while True:
if not e.is_set():# 程序剛開啟,is_set()的值是Flase,模擬訊號燈為紅色
print('\033[31m紅燈亮\033[0m,car%s等著' % n)
e.wait(0.1) # 阻塞,等待設定等待時間,等待0.1s之後沒有等到綠燈就闖紅燈走了
if not e.is_set():
print('\033[33m紅燈,警車先走\033[0m,car %s' % n)
else:
print('\033[33;46m綠燈,警車走\033[0m,car %s' % n)
break
def traffic_lights(e, inverval):
while True:
time.sleep(inverval)
if e.is_set():
print('######', e.is_set())
e.clear() # ---->將is_set()的值設定為False
else:
e.set() # ---->將is_set()的值設定為True
print('***********',e.is_set())
if __name__ == '__main__':
e = Event()
for i in range(10):
p=Process(target=car,args=(e,i,)) # 建立是個程序控制10輛車
p.start()
for i in range(5):
p = Process(target=police_car, args=(e, i,)) # 建立5個程序控制5輛警車
p.start()
t = Process(target=traffic_lights, args=(e, 10)) # 建立一個程序控制紅綠燈
t.start()
print('============》')