1. 程式人生 > >python並發編程基礎之守護進程、隊列、鎖

python並發編程基礎之守護進程、隊列、鎖

lock pytho locking ipc 讀取 多進程 sin 默認值 lee

並發編程2

1.守護進程

什麽是守護進程?

表示進程A守護進程B,當被守護進程B結束後,進程A也就結束。

from multiprocessing import Process
import time
?
def task():
    print(妃子的一生)
    time.sleep(15)
    print(妃子死了)
?
if __name__ == __main__:
    fz = Process(target=task)
    fz.daemon = True #將子進程作為主進程的守護進程。必須在開啟之前設置
    fz.start()
    print
(皇帝登基了) time.sleep(1) print(做了10年皇帝) time.sleep(5) print(皇帝駕崩了!)

上面的代碼說明了什麽叫守護進程,需要註意的是,守護進程須在進程開啟前設置,就是改變了,本質其實就是改變了Process類裏面的daemon屬性,默認是False

應用場景:之所以開啟子進程,是為了幫助主進程完成某個任務,然而主進程認為自己的事情一旦完成,子進程就沒必要存在,那就可以將子進程設置未主進程的守護進程。

例如:QQ下載文件,QQ退出,文件就終止下載。

2、互斥鎖

1.什麽是鎖(Lock)?

鎖是multiprocessing模塊中的一個類(Lock)。

2.為什麽要有鎖?

串行效率低,但是數據不會出現問題。並發效率高,但是數據可能出錯。

當多進程共享一個數據時,可能造成數據錯亂。

1.使用join方法可以讓數據不會錯亂,但是會讓進程變為串行。效率不高。

2.使用lock可以將數據加鎖,其他程序使用時需等待當前程序使用完成。lock只是使部分代碼串行,其他部分還是並發。

3.互斥鎖的示例

Lock類的兩個方法acquire(鎖)和release(釋放),acquire將狀態變為True, release是將狀態變為False

鎖的本質就是一個bool類型,在執行前會先判斷這個值。

def acquire(self, blocking=True, timeout=-1):
    
pass ? def release(self): pass #鎖的使用 #註意:使用鎖的必須保證鎖是同一個 from multiprocessing import Process,Lock import random import time ? ? def task1(lock): lock.acquire() print(1111) time.sleep(random.randint(1, 3)) print(11111111) lock.release() def task2(lock): lock.acquire() print(222222) time.sleep(random.randint(1,2)) print(2222222222222) lock.release() def task3(lock): lock.acquire() print(3333333333) time.sleep(random.randint(1, 3)) print(33333333333333333333333) lock.release() ? if __name__ == __main__: lock = Lock() p1 = Process(target=task1, args=(lock,)) p1.start() p2 = Process(target=task2, args=(lock,)) p2.start() p3 = Process(target=task3, args=(lock,)) p3.start() ###輸出結果除了1的位置固定,後兩個不固定,但是如果出現了2,則必須運行完2 1111 11111111 3333333333 33333333333333333333333 222222 2222222222222

?

4.鎖的應用

應用場景:購票軟件的購票過程。

邏輯分析:1.用戶都可以查看余票

2.但是其中一個用戶發起請求購票,其他用戶不能發起請求購票(鎖狀態)

import json
from multiprocessing import Process,Lock
import random
import time
#查看余票功能
def check_ticket(name):
    time.sleep(random.randint(1, 3))
    with open(ticket.json, mode=rt, encoding=utf-8)as f:
        dic = json.load(f)
        print(%s正在查看余票,  余票:%s 張 % (name, dic[count]))
#購票功能
def buy_ticket(name):
    with open(ticket.json, mode=rt, encoding=utf-8)as f:
        dic = json.load(f)
        if dic[count] > 0:
            time.sleep(random.randint(1, 3))
            dic[count] -= 1
            print(%s購票成功! 余票:%s張 % (name, dic[count]))
            with open(ticket.json, mode=wt, encoding=utf-8)as f:
                json.dump(dic, f)
#用戶購票流程
def task(name,lock):
    check_ticket(name)
    lock.acquire()
    buy_ticket(name)
    lock.release()
    
if __name__ == __main__:
    lock = Lock()
    for i in range(1, 11):
        p = Process(target=task, args=(用戶%s % i, lock))
        p.start()

5.重入鎖(RLock)

RLock類,可重入的鎖。Lock類只能有一個acquire方法,RLock可以有多個,如果在多進程中使用Rlock,並且一個進程A執行了多次acquire,其他進程B想要獲取鎖,則需將所有的acquire釋放可以獲取。

from multiprocessing import RLock
lock = RLock()
lock.acquire()
lock.acquire()
lock.acquire()
print(haha)
lock.release()
lock.release()
lock.release()
#註意重入鎖,鎖幾次就需要開幾次

6.死鎖

1.什麽是死鎖?

死鎖是指鎖無法打開,導致程序卡死。必須是多把鎖才能形成死鎖。正常開發時,要避免這種情況。通常一把鎖可以解決大多數問題。

##以一個吃飯需要盤子l1 和筷子 l2 二者缺一不可來說明死鎖
from multiprocessing import Process,Lock
import time
def task1(l1, l2, i):
    l1.acquire()
    print(盤子被%s搶走了% i)
    time.sleep(2)
?
    l2.acquire()
    print(筷子被%s搶走了% i)
    print(吃飯!)
    l1.release()
    l2.release()
?
def task2(l1, l2, i):
    l2.acquire()      ###task2先搶了筷子,導致task1無法完成後續動作
    print(筷子被%s搶走了 % i)
?
?
    l1.acquire()
    print(盤子被%s搶走了% i)
    print(吃飯!)
    l1.release()
    l2.release()
?
?
if __name__ == __main__:
    l1 = Lock()
    l2 = Lock()
    Process(target=task1, args=(l1, l2, 1)).start()   #開啟兩個進程
    Process(target=task2, args=(l1, l2, 2)).start()
    
?
    
#在開發中要避免這種問題。如果有使用則應該註意將task2調整鎖的位置。
def task2(l1, l2, i):
    l1.acquire()
    print(盤子被%s搶走了% i)
    l2.acquire()      
    print(筷子被%s搶走了 % i)
    print(吃飯!)
    l1.release()
    l2.release()

7.ICP進程間通訊

由於進程間內存是相互獨立的,所以需要對應積極的方案能夠使得進程之間可以相互傳遞數據。

解決方案:

1.使用共享文件。多個進程同時讀寫同一個文件。但是IO速度慢,傳輸的數據大小不受限制。

2.管道。基於內存,速度塊,但是是單向的,用起來麻煩。

3.隊列。申請共享內存空間,多個進程可以共享這個內存區域。

#創建共享文件
?
from multiprocessing import Manager,Process,Lock
def work(d):
    # with lock:
        d[count]-=1
?
if __name__ == __main__:
?
    with Manager() as m:
        dic=m.dict({count:100}) #創建一個共享的字典
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,))
            p_l.append(p)
            p.start()
?
        for p in p_l:
            p.join()
        print(dic)

8.隊列

進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。

隊列也是一種數據容器,其特點:先進先出。

與堆棧相反:先進後出。 (函數的調用就是一種類似堆棧方式)

創建隊列的類(底層就是以管道和鎖定的方式實現):

1.Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞

2.maxsize是隊列中允許最大項數,省略則無大小限制。

主要方法

q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
? q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那麽在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
? q.get_nowait():同q.get(False)
? q.put_nowait():同q.put(False)
? q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。
? q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。
? q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣

隊列的優點是:保證數據不會錯亂,即使在多進程中。因為put和get都是阻塞

from multiprocessing import Queue
?
q = Queue(3)    #創建一個隊列,隊列最多存三個數據
q.put(ming)   #存數據
q.put(123)
print(q.get())  #put默認會阻塞,當隊列裝滿時程序會卡住
print(q.get())
q.put(xian)   #
q.put(daidai,False)  #put第二個參數設置為False會強行將數據放入隊列
q.put(j, True, timeout=3)    #timeout等待時間
?


?

9.生產者消費者模型

1.什麽是生產者消費者模型?

生產者:生產數據的一方。

消費者:處理使用數據的一方。

例如:一個爬蟲程序:1.爬去數據 2.解析數據

2.怎麽實現?

1.將兩個不同任務分配給不同的進程。

2.提供一個進程共享的數據容器

#模型示例:
from multiprocessing import Process, Queue
import time
import random
?
?
def get_data(q):
    for num in range(1, 6):
        print(正在爬取第%s個數據 % num)
        time.sleep(random.randint(1, 2))
        print(第%s個數據爬取完成。 % num)
        q.put(第%s個數據 % num)
?
?
def parse_data(q):
    for num in range(1, 6):
        data = q.get()
        print(正在解析%s % data)
        time.sleep(random.randint(1, 2))
        print(%s解析完成。 % data)
if __name__ == __main__:
    q = Queue(5)  # 創建一個可以存放5個數據的隊列
    #生產者模型
    producer = Process(target=get_data, args=(q,))
    producer.start()
    #消費者模型
    consumer = Process(target=parse_data, args=(q,))
    consumer.start()
###輸出結果
正在爬取第1個數據
第1個數據爬取完成。
正在爬取第2個數據
正在解析第1個數據
第2個數據爬取完成。
正在爬取第3個數據
第1個數據解析完成。
正在解析第2個數據
第3個數據爬取完成。
正在爬取第4個數據
第2個數據解析完成。
正在解析第3個數據
第3個數據解析完成。
第4個數據爬取完成。
正在爬取第5個數據
正在解析第4個數據
第5個數據爬取完成。
第4個數據解析完成。
正在解析第5個數據
第5個數據解析完成。
###可以看出兩個模型之間互不幹擾,相互獨立。

python並發編程基礎之守護進程、隊列、鎖