1. 程式人生 > >程序(三):程序同步——Lock(鎖)、Semaphore(訊號量)、Event(事件)

程序(三):程序同步——Lock(鎖)、Semaphore(訊號量)、Event(事件)

目錄

鎖 —— multiprocess.Lock

訊號量 —— multiprocess.Semaphore(瞭解)

事件 —— multiprocess.Event(瞭解)


鎖 —— multiprocess.Lock

  當多個程序使用同一份資料資源的時候,就會引發資料安全或順序混亂問題。

(1)多程序搶佔輸出資源
import os
import time
import random
from multiprocessing import Process

def work(n):
    print('%s: %s is running' %(n,os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' %(n,os.getpid()))

if __name__ == '__main__':
    for i in range(3):
        p=Process(target=work,args=(i,))
        p.start()


(2)使用鎖維護執行順序
# 由併發變成了序列,犧牲了執行效率,但避免了競爭
import os
import time
import random
from multiprocessing import Process,Lock

def work(lock,n):
    lock.acquire()
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s: %s is done' % (n, os.getpid()))
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,i))
        p.start()



  上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程式又重新變成串行了,這樣確實會浪費了時間,卻保證了資料的安全。

  接下來,我們以模擬搶票為例,來看看資料安全的重要性。 


(1)多程序同時搶購餘票
#檔案db的內容為:{"count":1}
#注意一定要用雙引號,不然json無法識別
#併發執行,效率高,但競爭寫同一檔案,資料寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(0.1) #模擬讀資料的網路延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模擬寫資料的網路延遲
        json.dump(dic,open('db','w'))
        print('\033[43m購票成功\033[0m')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(100): #模擬併發100個客戶端搶票
        p=Process(target=task)
        p.start()


(2)使用鎖來保證資料安全
#檔案db的內容為:{"count":5}
#注意一定要用雙引號,不然json無法識別
#併發執行,效率高,但競爭寫同一檔案,資料寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(random.random()) #模擬讀資料的網路延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(random.random()) #模擬寫資料的網路延遲
        json.dump(dic,open('db','w'))
        print('\033[32m購票成功\033[0m')
    else:
        print('\033[31m購票失敗\033[0m')

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(100): #模擬併發100個客戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()


總結:

加鎖可以保證多個程序修改同一塊資料時,同一時間只能有一個任務可以進行修改,即序列的修改,沒錯,速度是慢了,但犧牲了速度卻保證了資料安全。
雖然可以用檔案共享資料實現程序間通訊,但問題是:
1.效率低
2.需要自己加鎖處理

為此mutiprocessing模組為我們提供了基於訊息的IPC通訊機制:佇列和管道。
1 佇列和管道都是將資料存放於記憶體中
2 佇列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,
我們應該儘量避免使用共享資料,儘可能使用訊息傳遞和佇列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。

訊號量 —— multiprocess.Semaphore(瞭解)

訊號量介紹Semaphore

●互斥鎖同時只允許一個執行緒更改資料,而訊號量Semaphore是同時允許一定數量的執行緒更改資料 。

●假設商場裡有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。

●實現: 訊號量同步基於內部計數器,每呼叫一次acquire(),計數器減1;每呼叫一次release(),計數器加1.當計數器為0時,acquire()呼叫被阻塞。這是迪科斯徹(Dijkstra)訊號量概念P()和V()的Python實現。訊號量同步機制適用於訪問像伺服器這樣的有限資源。

●訊號量與程序池的概念很像,但是要區分開,訊號量涉及到加鎖的概念

#例子
from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print('%s 佔到一間ktv小屋' %user)
    time.sleep(random.randint(0,3)) #模擬每個人在ktv中待的時間不同
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(4)
    p_l=[]
    for i in range(13):
        p=Process(target=go_ktv,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

事件 —— multiprocess.Event(瞭解)

事件介紹

●python執行緒的事件用於主執行緒控制其他執行緒的執行,事件主要提供了三個方法 set、wait、clear。

●事件處理的機制:全域性定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。

●clear:將“Flag”設定為False
●set:將“Flag”設定為True

#紅綠燈例項
from multiprocessing import Process, Event
import time, random


def car(e, n):
    while True:
        if not e.is_set():  # 程序剛開啟,is_set()的值是Flase,模擬訊號燈為紅色
            print('\033[31m紅燈亮\033[0m,car%s等著' % n)
            e.wait()    # 阻塞,等待is_set()的值變成True,模擬訊號燈為綠色
            print('\033[32m車%s 看見綠燈亮了\033[0m' % n)
            time.sleep(random.randint(3, 6))
            if not e.is_set():   #如果is_set()的值是Flase,也就是紅燈,仍然回到while語句開始
                continue
            print('車開遠了,car', n)
            break


def police_car(e, n):
    while True:
        if not e.is_set():# 程序剛開啟,is_set()的值是Flase,模擬訊號燈為紅色
            print('\033[31m紅燈亮\033[0m,car%s等著' % n)
            e.wait(0.1) # 阻塞,等待設定等待時間,等待0.1s之後沒有等到綠燈就闖紅燈走了
            if not e.is_set():
                print('\033[33m紅燈,警車先走\033[0m,car %s' % n)
            else:
                print('\033[33;46m綠燈,警車走\033[0m,car %s' % n)
        break



def traffic_lights(e, inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            print('######', e.is_set())
            e.clear()  # ---->將is_set()的值設定為False
        else:
            e.set()    # ---->將is_set()的值設定為True
            print('***********',e.is_set())


if __name__ == '__main__':
    e = Event()
    for i in range(10):
        p=Process(target=car,args=(e,i,))  # 建立是個程序控制10輛車
        p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))  # 建立5個程序控制5輛警車
        p.start()
    t = Process(target=traffic_lights, args=(e, 10))  # 建立一個程序控制紅綠燈
    t.start()

    print('============》')