1. 程式人生 > >20181229(守護程序,互斥鎖,IPC,生產者和消費者模型)

20181229(守護程序,互斥鎖,IPC,生產者和消費者模型)

 

 

一、守護程序

守護程序:一個程序B守護另一個程序A,當被守護的程序A結束,程序B也就結束了。(不一定同生,但會同死)

 

兩個特點:

①守護程序會在主程序程式碼執行結束後就終止

②守護程序內無法再開啟子程序,否則丟擲異常。

注意:程序之間是互相獨立的,主程序程式碼執行結束,守護程序隨即終止

 

應用場景:如果主程序認為一旦自己結束,子程序也就沒有繼續執行的必要了,就可以將子程序設定為守護程序。(例如:qq正在呼叫自身的下載檔案功能,但是此時退出了qq,下載程序也就可以直接關閉了)

方法為:process.daemon=True,必須放置在子程序開啟前。

from multiprocessing import Process
import time

def task(name):
   print('%s is running' % name)
   time.sleep(3)
   print('%s is running' % name)  # 等待三秒的時候,被守護程序已經執行完了,所以守護程序會被回收,此句話就不會列印了。


if __name__ == '__main__':
   obj = Process(target=task, args=('守護程序',))
   obj.daemon=True  # 必須在start前設定,會限制p建立子程序,且父程序結束子程序馬上殉葬結束。
   obj.start()  # 傳送訊號給作業系統
   time.sleep(1)
   print('被守護程序')
輸出結果:
守護程序 is running
被守護程序

 

二、互斥鎖

互斥鎖:將併發變為序列(即一個一個執行)

from multiprocessing import Process,Lock
import time,random

mutex=Lock()  # 例項化互斥鎖,也可以放到main下面。
# 強調:必須是lock.acquire()一次,然後 lock.release()釋放一次,才能繼續lock.acquire(),不能連續的lock.acquire(),否則會形成阻塞,程式卡死。

def task1(lock):
   lock.acquire() # 得到這把鎖,此時其他的同級子程序都只能等待當前程序將鎖釋放之後才有機會執行。
   print('task1:名字是egon')
   time.sleep(random.randint(1,3))
   print('task1:性別是male')
   time.sleep(random.randint(1,3))
   print('task1:年齡是18')
   lock.release()  # 鎖用完之後,一定要釋放,否則其他子程序無法進行

def task2(lock):
   lock.acquire()  # 是一個阻塞的函式 會等到別的程序釋放鎖才能繼續執行
   print('task2:名字是alex')
   time.sleep(random.randint(1,3))
   print('task2:性別是male')
   time.sleep(random.randint(1,3))
   print('task2:年齡是78')
   lock.release()


def task3(lock):
   lock.acquire()
   print('task3:名字是lxx')
   time.sleep(random.randint(1,3))
   print('task3:性別是female')
   time.sleep(random.randint(1,3))
   print('task3:年齡是30')
   lock.release()


if __name__ == '__main__':
   p1=Process(target=task1,args=(mutex,))  # 建立程序並傳參,將鎖傳給各個子程序
   p2=Process(target=task2,args=(mutex,))
   p3=Process(target=task3,args=(mutex,))
   p1.start()   # 三個子程式都能啟動,但是一旦碰到鎖,誰先搶到誰先執行,其他的要等。
   p2.start()
   p3.start()

 

互斥鎖與join的區別:

二者原理都一樣,都是將併發變為序列,從而保證資料增刪改查的有序性,不至於讓資料發生錯亂。

二者的區別在於join是按照人為指定順序執行,程序執行順序是固定的,而互斥鎖是所有程序公平競爭,誰先搶到鎖,誰就能先執行;且互斥鎖可以指定某一部分程式碼序列,其餘部分程式碼併發,而join則是整個子程序程式碼完全序列。

互斥鎖的本質是一個布林型別的資料,在執行程式碼前,會先判斷這個值。

 

互斥鎖在搶票中的應用:

import json
from multiprocessing import Process,Lock
import time
import random

# 檢視剩餘票數
def check_ticket(usr):
   time.sleep(random.randint(1,3))
   with open("ticket.json","r",encoding="utf-8") as f:   # 提前做好json檔案,做成字典。
       dic = json.load(f)
       print("%s檢視 剩餘票數:%s" % (usr,dic["count"]))

def buy_ticket(usr):
   with open("ticket.json","r",encoding="utf-8") as f:
       dic = json.load(f)
       if dic["count"] > 0:
           time.sleep(random.randint(1,3))
           dic["count"] -= 1
           with open("ticket.json", "w", encoding="utf-8") as f2:  #
               json.dump(dic,f2)
               print("%s 購票成功!" % usr)
        else:
           print("票被搶走了!%s購票失敗" % usr)


def task(usr,lock):
   check_ticket(usr)  # 查票可以併發
   lock.acquire()  # 排票就要串行了
   buy_ticket(usr)
   lock.release() # 買好票後要釋放鎖

if __name__ == '__main__':
   lock = Lock()
   for i in range(10):   # 啟動10個程序去買票,模仿10個人。
       p = Process(target=task,args=("使用者%s" % i,lock))
       p.start()
       
輸出結果:   #每次結果可能都不一致
使用者0檢視 剩餘票數:1
使用者2檢視 剩餘票數:1
使用者3檢視 剩餘票數:1
使用者1檢視 剩餘票數:1
使用者0 購票成功!
票被搶走了!使用者2購票失敗
票被搶走了!使用者3購票失敗
票被搶走了!使用者1購票失敗
使用者5檢視 剩餘票數:0
票被搶走了!使用者5購票失敗
使用者4檢視 剩餘票數:0
票被搶走了!使用者4購票失敗
使用者7檢視 剩餘票數:0
票被搶走了!使用者7購票失敗
使用者6檢視 剩餘票數:0
票被搶走了!使用者6購票失敗
使用者9檢視 剩餘票數:0
票被搶走了!使用者9購票失敗
使用者8檢視 剩餘票數:0
票被搶走了!使用者8購票失敗

Rlock

RLock 表示可重入鎖,其特點是可以多次執行acquire而不會阻塞。
如果在多程序中使用Rlock,並且一個程序a 執行了多次acquire,其他程序b要想獲得這個鎖,需要程序a把鎖解開,並且鎖了幾次就要解幾次。
普通鎖如果多次執行acquire將會鎖死(阻塞)

 

from multiprocessing import RLock,Process
import time
def task(i,lock):
   lock.acquire()
   lock.acquire()
   print(i)
   time.sleep(3)
   lock.release()
   lock.release()  #第一個過來 睡3秒 第二個過來了 睡3秒   第一個列印1 第二個列印2

if __name__ == '__main__':
   lock = RLock() # 重入鎖,即多次鎖
   p1 = Process(target=task,args=(1,lock))
   p1.start()
   p2 = Process(target=task,args=(2,lock))
   p2.start()

死鎖:指鎖無法開啟,導致程式無法繼續執行。即兩把鎖交給了兩個人,這兩個人還都需要對方的那把鎖,互相誰也無法釋放鎖,也得不到對方的鎖。

程式中儘可能不要使用多把鎖,防止發生死鎖現象

from multiprocessing import Process,Lock
import time
def task1(l1,l2,i):
   l1.acquire()  # 拿走了l1的鎖,但是task2拿走了l2的鎖,兩個程序後續都無法進行。
   print("盤子被%s搶走了" % i)
   time.sleep(1)
   l2.acquire()
   print("筷子被%s搶走了" % i)
   print("吃飯..")
   l1.release()
   l2.release()


def task2(l1,l2,i):
   l2.acquire()
   print("筷子被%s搶走了" % i)
   l1.acquire()
   print("盤子被%s搶走了" % i)
   print("吃飯..")
   l1.release()
   l2.release()


if __name__ == '__main__':
   l1 = Lock()
   l2 = Lock()
   Process(target=task1,args=(l1,l2,1)).start()
   Process(target=task2,args=(l1,l2,2)).start()

 

三、IPC通訊

IPC:程序間通訊, 程序間相互獨立,所以要解決程序間相互傳輸資料的問題。 1、使用共享檔案,多個程序同時讀寫一個檔案 受限IO,速度慢 2、管道,基於記憶體速度快,但是是單向的,比較麻煩 3、申請共享記憶體空間,程序們可以共享這片區域 基於記憶體速度快,但是資料量不能太大

from multiprocessing import Process,Manager,Lock  # 匯入manager
import time

mutex=Lock()

def task(dic,lock):
   lock.acquire()  # 如果不加鎖,都會讀取字典,然後修改,結果就是變成9
   temp=dic['num']
   time.sleep(0.1)  # 讀取共享空間之後,沉睡0.1秒,讓所有程序都能執行起來
   dic['num']=temp-1
   lock.release()

if __name__ == '__main__':
   m=Manager()  # 例項化manager
   dic=m.dict({'num':10}) # 建立一個字典物件,這是子程序的共享空間
   l=[]
   for i in range(10):
       p=Process(target=task,args=(dic,mutex))
       l.append(p)
       p.start()

   for p in l:
       p.join()  # 子程序都結束後,再去檢視字典
   print(dic)
輸出結果:
{'num': 0}
如果不加鎖,CPU和記憶體足夠快,輸出結果可能為{'num':9}

 

四、佇列

一種資料容器,特點是先進先出。(棧是後進先出)

優點:是程序的共享空間,可以自動處理鎖的問題(如上小節中如果鎖處理不好,就會發生資料錯亂)

即便在多程序下,也可以保證資料不會錯亂,因為put和get預設阻塞。

注意點:

1、佇列用來處理程序間的資料,且在記憶體中,所以資料量不應過大;

2、限制maxsize的值如果超過了記憶體就變得毫無意義。

from multiprocessing import Queue

q = Queue(1)  # 例項化一個佇列,最多可以存一個數據
q.put("張三")  # 將資料放進佇列中
print(q.get())  # 將資料從佇列中拿出
q.put("李四") # 當容器裝滿時,put預設會阻塞 。如果上一步中沒有get,此時就會阻塞
q.put("福布斯",False) # False表示不會阻塞 無論容器是滿了 都會強行塞 如果滿了就拋異常

print(q.get())
print(q.get()) # 當容器中資料時,get預設阻塞
print(q.get(timeout=3)) # timeout 僅用於阻塞時,此處表示會等待3秒,三秒後如果還取不到資料,就會報錯queue.Empty

print("over")  # 因為阻塞,此句不會列印

 

def put(self, obj, block=True, timeout=None):  # block表示阻塞,timeout表示等待時間。
   pass

def get(self, block=True, timeout=None):
   pass

 

五、生產者消費者模型

生產者:資料的生產者

消費者:處理資料者

 

生產者消費者模型三要素:生產者、消費者、佇列

 

應用場景:程式中出現明顯的兩類,一類負責生產資料,另一類負責處理資料時。

 

該模型優點:

1、實現了資料生產者與資料消費者之間的解耦合

2、平衡了生產力和消費力,即生產者只關心生產即可,消費者只關心處理資料即可,二者通過佇列溝通。

 

import random
from multiprocessing import Process,Queue
import time
# 爬資料
def get_data(q):

   for num in range(5):
       print("正在爬取第%s個數據" % num)
       time.sleep(random.randint(1,2))
       print("第%s個數據 爬取完成" % num)
       # 把資料裝到佇列中
       q.put("第%s個數據" % num)


def parse_data(q):  # 生產者只關心生產即可,佇列數最大為5,如果佇列滿了就進入阻塞狀態。
   for num in range(5):
       # 取出資料
       data = q.get()
       print("正在解析%s" % data)
       time.sleep(random.randint(1, 2))
       print("%s 解析完成" % data)

if __name__ == '__main__':
   # 共享資料容器
   q = Queue(5)
   #生產者程序
   produce =  Process(target=get_data,args=(q,))
   produce.start()
   #消費者程序
   customer = Process(target=parse_data,args=(q,))
   customer.start()