1. 程式人生 > >python併發程式設計基礎之守護程序、佇列、鎖

python併發程式設計基礎之守護程序、佇列、鎖

併發程式設計2

1.守護程序

什麼是守護程序?

表示程序A守護程序B,當被守護程序B結束後,程序A也就結束。

from multiprocessing import Process
import time
​
def task():
    print('妃子的一生')
    time.sleep(15)
    print('妃子死了')
​
if __name__ == '__main__':
    fz = Process(target=task)
    fz.daemon = True #將子程序作為主程序的守護程序。必須在開啟之前設定
    fz.start()
    
print('皇帝登基了') time.sleep(1) print('做了10年皇帝') time.sleep(5) print('皇帝駕崩了!')

 

上面的程式碼說明了什麼叫守護程序,需要注意的是,守護程序須在程序開啟前設定,就是改變了,本質其實就是改變了Process類裡面的daemon屬性,預設是False

應用場景:之所以開啟子程序,是為了幫助主程序完成某個任務,然而主程序認為自己的事情一旦完成,子程序就沒必要存在,那就可以將子程序設定未主程序的守護程序。

例如:QQ下載檔案,QQ退出,檔案就終止下載。

 

2、互斥鎖

1.什麼是鎖(Lock)?

鎖是multiprocessing模組中的一個類(Lock)。

2.為什麼要有鎖?

序列效率低,但是資料不會出現問題。併發效率高,但是資料可能出錯。

當多程序共享一個數據時,可能造成資料錯亂。

                1.使用join方法可以讓資料不會錯亂,但是會讓程序變為序列。效率不高。

2.使用lock可以將資料加鎖,其他程式使用時需等待當前程式使用完成。lock只是使部分程式碼序列,其他部分還是併發。

3.互斥鎖的示例

Lock類的兩個方法acquire(鎖)和release(釋放),acquire將狀態變為True,  release是將狀態變為False

鎖的本質就是一個bool型別,在執行前會先判斷這個值。

def acquire(self, blocking=True, timeout=-1):
    passdef release(self):
    pass
#鎖的使用
#注意:使用鎖的必須保證鎖是同一個
from multiprocessing import Process,Lock
import random
import time
​
​
def task1(lock):
    lock.acquire()
    print('1111')
    time.sleep(random.randint(1, 3))
    print('11111111')
    lock.release()
def task2(lock):
    lock.acquire()
    print('222222')
    time.sleep(random.randint(1,2))
    print('2222222222222')
    lock.release()
def task3(lock):
    lock.acquire()
    print('3333333333')
    time.sleep(random.randint(1, 3))
    print('33333333333333333333333')
    lock.release()
​
if __name__ == '__main__':
    lock = Lock()
    p1 = Process(target=task1, args=(lock,))
    p1.start()
    p2 = Process(target=task2, args=(lock,))
    p2.start()
    p3 = Process(target=task3, args=(lock,))
    p3.start()
    
 ###輸出結果除了1的位置固定,後兩個不固定,但是如果出現了2,則必須執行完2
1111
11111111
3333333333
33333333333333333333333
222222
2222222222222

 

4.鎖的應用

應用場景:購票軟體的購票過程。

邏輯分析:1.使用者都可以檢視餘票

    2.但是其中一個使用者發起請求購票,其他使用者不能發起請求購票(鎖狀態)

import json
from multiprocessing import Process,Lock
import random
import time
#檢視餘票功能
def check_ticket(name):
    time.sleep(random.randint(1, 3))
    with open('ticket.json', mode='rt', encoding='utf-8')as f:
        dic = json.load(f)
        print('%s正在檢視餘票,  餘票:%s 張' % (name, dic['count']))
#購票功能
def buy_ticket(name):
    with open('ticket.json', mode='rt', encoding='utf-8')as f:
        dic = json.load(f)
        if dic['count'] > 0:
            time.sleep(random.randint(1, 3))
            dic['count'] -= 1
            print('%s購票成功! 餘票:%s張' % (name, dic['count']))
            with open('ticket.json', mode='wt', encoding='utf-8')as f:
                json.dump(dic, f)
#使用者購票流程
def task(name,lock):
    check_ticket(name)
    lock.acquire()
    buy_ticket(name)
    lock.release()
    
if __name__ == '__main__':
    lock = Lock()
    for i in range(1, 11):
        p = Process(target=task, args=('使用者%s' % i, lock))
        p.start()

 

5.重入鎖(RLock)

RLock類,可重入的鎖。Lock類只能有一個acquire方法,RLock可以有多個,如果在多程序中使用Rlock,並且一個程序A執行了多次acquire,其他程序B想要獲取鎖,則需將所有的acquire釋放可以獲取。

from multiprocessing import RLock
lock = RLock()
lock.acquire()
lock.acquire()
lock.acquire()
print('haha')
lock.release()
lock.release()
lock.release()
#注意重入鎖,鎖幾次就需要開幾次

 

6.死鎖

1.什麼是死鎖?

死鎖是指鎖無法開啟,導致程式卡死。必須是多把鎖才能形成死鎖。正常開發時,要避免這種情況。通常一把鎖可以解決大多數問題。

##以一個吃飯需要盤子l1 和筷子 l2 二者缺一不可來說明死鎖
from multiprocessing import Process,Lock
import time
def task1(l1, l2, i):
    l1.acquire()
    print('盤子被%s搶走了'% i)
    time.sleep(2)
​
    l2.acquire()
    print('筷子被%s搶走了'% i)
    print('吃飯!')
    l1.release()
    l2.release()
​
def task2(l1, l2, i):
    l2.acquire()      ###task2先搶了筷子,導致task1無法完成後續動作
    print('筷子被%s搶走了' % i)
​
​
    l1.acquire()
    print('盤子被%s搶走了'% i)
    print('吃飯!')
    l1.release()
    l2.release()
​
​
if __name__ == '__main__':
    l1 = Lock()
    l2 = Lock()
    Process(target=task1, args=(l1, l2, 1)).start()   #開啟兩個程序
    Process(target=task2, args=(l1, l2, 2)).start()
    
​
    
#在開發中要避免這種問題。如果有使用則應該注意將task2調整鎖的位置。
def task2(l1, l2, i):
    l1.acquire()
    print('盤子被%s搶走了'% i)
    l2.acquire()      
    print('筷子被%s搶走了' % i)
    print('吃飯!')
    l1.release()
    l2.release()

 

7.ICP程序間通訊

由於程序間記憶體是相互獨立的,所以需要對應積極的方案能夠使得程序之間可以相互傳遞資料。

解決方案:

1.使用共享檔案。多個程序同時讀寫同一個檔案。但是IO速度慢,傳輸的資料大小不受限制。

2.管道。基於記憶體,速度塊,但是是單向的,用起來麻煩。

3.佇列。申請共享記憶體空間,多個程序可以共享這個記憶體區域。

#建立共享檔案
from multiprocessing import Manager,Process,Lock
def work(d):
    # with lock:
        d['count']-=1if __name__ == '__main__':
​
    with Manager() as m:
        dic=m.dict({'count':100}) #建立一個共享的字典
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,))
            p_l.append(p)
            p.start()
​
        for p in p_l:
            p.join()
        print(dic)

 

8.佇列

程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:佇列和管道,這兩種方式都是使用訊息傳遞的。

佇列也是一種資料容器,其特點:先進先出。

與堆疊相反:先進後出。  (函式的呼叫就是一種類似堆疊方式)

建立佇列的類(底層就是以管道和鎖定的方式實現):

1.Queue([maxsize]):建立共享的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞

2.maxsize是佇列中允許最大項數,省略則無大小限制。

主要方法

q.put方法用以插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為False,但該Queue已滿,會立即丟擲Queue.Full異常。
q.get方法可以從佇列讀取並且刪除一個元素。同樣,get方法有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果佇列為空,則立即丟擲Queue.Empty異常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():呼叫此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果佇列中又加入了專案。
q.full():呼叫此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果佇列中的專案被取走。
q.qsize():返回佇列中目前專案的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣

佇列的優點是:保證資料不會錯亂,即使在多程序中。因為put和get都是阻塞

from multiprocessing import Queue
​
q = Queue(3)    #建立一個佇列,佇列最多存三個資料
q.put('ming')   #存資料
q.put(123)
print(q.get())  #put預設會阻塞,當佇列裝滿時程式會卡住
print(q.get())
q.put('xian')   #
q.put('daidai',False)  #put第二個引數設定為False會強行將資料放入佇列
q.put('j', True, timeout=3)    #timeout等待時間

 


    9.生產者消費者模型

1.什麼是生產者消費者模型?

生產者:生產資料的一方。

消費者:處理使用資料的一方。

例如:一個爬蟲程式:1.爬去資料     2.解析資料

2.怎麼實現?

1.將兩個不同任務分配給不同的程序。

2.提供一個程序共享的資料容器

 

#模型示例:
from multiprocessing import Process, Queue
import time
import random
​
​
def get_data(q):
    for num in range(1, 6):
        print('正在爬取第%s個數據' % num)
        time.sleep(random.randint(1, 2))
        print('第%s個數據爬取完成。' % num)
        q.put('第%s個數據' % num)
​
​
def parse_data(q):
    for num in range(1, 6):
        data = q.get()
        print('正在解析%s' % data)
        time.sleep(random.randint(1, 2))
        print('%s解析完成。' % data)
if __name__ == '__main__':
    q = Queue(5)  # 建立一個可以存放5個數據的佇列
    #生產者模型
    producer = Process(target=get_data, args=(q,))
    producer.start()
    #消費者模型
    consumer = Process(target=parse_data, args=(q,))
    consumer.start()
###輸出結果
正在爬取第1個數據
第1個數據爬取完成。
正在爬取第2個數據
正在解析第1個數據
第2個數據爬取完成。
正在爬取第3個數據
第1個數據解析完成。
正在解析第2個數據
第3個數據爬取完成。
正在爬取第4個數據
第2個數據解析完成。
正在解析第3個數據
第3個數據解析完成。
第4個數據爬取完成。
正在爬取第5個數據
正在解析第4個數據
第5個數據爬取完成。
第4個數據解析完成。
正在解析第5個數據
第5個數據解析完成。
###可以看出兩個模型之間互不干擾,相互獨立。