1. 程式人生 > >Python中的生產者消費者模型

Python中的生產者消費者模型

ssi png 問題 import odi rand com 共享數據 守護

了解知識點:

1、守護進程:

·什麽是守護進程:

守護進程其實就是一個‘子進程’,守護即伴隨,守護進程會伴隨主進程的代碼運行完畢後而死掉

·為何用守護進程:

當該子進程內的代碼在父進程代碼運行完畢後就沒有存在的意義了,就應該將進程設置為守護進程,會在父進程代碼結束後死掉

from multiprocessing import Process

import time,os

def task(name):
print(‘%s is running‘%name)
time.sleep(3)

if __name__ == ‘__main__‘:
p1=Process(target=task,args=(‘守護進程‘,))
p2=Process(target=task,args=(‘正常的子進程‘,))
p1.daemon=True # 一定要放到p.start()之前
p1.start()
p2.start()
print(‘主‘)

守護進程舉例

技術分享圖片

以下是守護進程會迷惑人的範例:

#主進程代碼運行完畢,守護進程就會結束
from multiprocessing import Process
import time
def foo():
print(123)
time.sleep(1)
print("end123")

def bar():
print(456)
time.sleep(3)
print("end456")

if __name__ == ‘__main__‘:
p1=Process(target=foo)
p2=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
print("main-------")

‘‘‘
main-------
456
enn456
‘‘‘


‘‘‘
main-------
123
456
enn456
‘‘‘

‘‘‘
123
main-------
456
end456
‘‘‘

2、互斥鎖:

互斥鎖:可以將要執行任務的部分代碼(只涉及到修改共享數據的代碼)變成串行

join:是要執行任務的所有代碼整體串行

強調:必須是lock.acquire()一次,然後 lock.release()釋放一次,才能繼續lock.acquire(),不能連續的lock.acquire()。否者程序停在原地。
互斥鎖vs join: 
大前提:二者的原理都是一樣,都是將並發變成串行,從而保證有序(在多個程序共享一個資源時,為保證有序不亂,需將並發變成串行)
區別一:join是按照人為指定的順序執行,而互斥鎖是所以進程平等地競爭,誰先搶到誰執行
區別二:互斥鎖可以讓一部分代碼(修改共享數據的代碼)串行,而join只能將代碼整體串行(詳見搶票系統)

from multiprocessing import Process,Lock
import json
import os
import time
import random

def check():
time.sleep(1) # 模擬網路延遲
with open(‘db.txt‘,‘rt‘,encoding=‘utf-8‘) as f:
dic=json.load(f)
print(‘%s 查看到剩余票數 [%s]‘ %(os.getpid(),dic[‘count‘]))

def get():
with open(‘db.txt‘,‘rt‘,encoding=‘utf-8‘) as f:
dic=json.load(f)
time.sleep(2)
if dic[‘count‘] > 0:
# 有票
dic[‘count‘]-=1
time.sleep(random.randint(1,3))
with open(‘db.txt‘,‘wt‘,encoding=‘utf-8‘) as f:
json.dump(dic,f)
print(‘%s 購票成功‘ %os.getpid())
else:
print(‘%s 沒有余票‘ %os.getpid())


def task(mutex):
# 查票
check()

#購票
mutex.acquire() # 互斥鎖不能連續的acquire,必須是release以後才能重新acquire
get()
mutex.release()

# with mutex:
# get()

if __name__ == ‘__main__‘:
mutex=Lock()
for i in range(10):
p=Process(target=task,args=(mutex,))
p.start()
# p.join()

模擬搶票

3、IPC通信機制

進程之間通信必須找到一種介質,該介質必須滿足
1、是所有進程共享的
2、必須是內存空間
附加:幫我們自動處理好鎖的問題
 
a、   from multiprocessing import Manager(共享內存,但要自己解決鎖的問題)
b、   IPC中的隊列(Queue) 共享,內存,自動處理鎖的問題(最常用)
c、   IPC中的管道(Pipe),共享,內存,需自己解決鎖的問題

a、用Manager(了解知識點)

from multiprocessing import Process,Manager,Lock
import time

mutex=Lock()

def task(dic,lock):
lock.acquire()
temp=dic[‘num‘]
time.sleep(0.1)
dic[‘num‘]=temp-1
lock.release()

if __name__ == ‘__main__‘:
m=Manager()
dic=m.dict({‘num‘:10})

l=[]
for i in range(10):
p=Process(target=task,args=(dic,mutex))
l.append(p)
p.start()
for p in l:
p.join()
print(dic)

b、用隊列Queue

1)共享的空間

2)是內存空間

3)自動幫我們處理好鎖定問題

from multiprocessing import Queue
q=Queue(3) #設置隊列中maxsize個數為三
q.put(‘first‘)
q.put({‘second‘:None})
q.put(‘三‘)
# q.put(4) #阻塞。不報錯,程序卡在原地等待隊列中清出一個值。默認blok=True
print(q.get())
print(q.get())
print(q.get())

強調:
1、隊列用來存成進程之間溝通的消息,數據量不應該過大
2、maxsize的值超過的內存限制就變得毫無意義

了解:
q=Queue(3)
q.put(‘first‘,block=False)
q.put(‘second‘,block=False)
q.put(‘third‘,block=False)
q.put(‘fourth‘,block=False) #報錯 queue.Full

q.put(‘first‘,block=True)
q.put(‘second‘,block=True)
q.put(‘third‘,block=True)
q.put(‘fourth‘,block=True,timeout=3) #等待3秒後若還進不去報錯。註意timeout不能和block=False連用

q.get(block=False)
q.get(block=False)
q.get(block=False)
q.get(block=False) #報錯 queue.Empty

q.get(block=True)
q.get(block=True)
q.get(block=True)
q.get(block=True,timeout=2) #等待2秒後還取不出東西則報錯。註意timeout不能和block=False連用

了解

4、生產者與消費者模型

該模型中包含兩類重要的角色:
1、生產者:將負責造數據的任務比喻為生產者
2、消費者:接收生產者造出的數據來做進一步的處理,該類人物被比喻成消費者
 
實現生產者消費者模型三要素
1、生產者
2、消費者
3、隊列
什麽時候用該模型:
程序中出現明顯的兩類任何,一類任務是負責生產,另外一類任務是負責處理生產的數據的
 
該模型的好處:
1、實現了生產者與消費者解耦和
2、平衡了生產者的生產力與消費者的處理數據的能力

註意:生產者消費者模型是解決問題的思路不是技術。可以用進程和隊列來實現,也可以用其他的來實現。

from multiprocessing import JoinableQueue,Process
import time
import os
import random

def producer(name,food,q):
for i in range(3):
res=‘%s%s‘ %(food,i)
time.sleep(random.randint(1,3))
# 往隊列裏丟
q.put(res)
print(‘\033[45m%s 生產了 %s\033[0m‘ %(name,res))
# q.put(None)

def consumer(name,q):
while True:
#從隊列裏取走
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print(‘\033[46m%s 吃了 %s\033[0m‘ %(name,res))
q.task_done()

if __name__ == ‘__main__‘:
q=JoinableQueue()
# 生產者們
p1=Process(target=producer,args=(‘egon‘,‘包子‘,q,))
p2=Process(target=producer,args=(‘楊軍‘,‘泔水‘,q,))
p3=Process(target=producer,args=(‘猴老師‘,‘翔‘,q,))
# 消費者們
c1=Process(target=consumer,args=(‘Alex‘,q,))
c2=Process(target=consumer,args=(‘wupeiqidsb‘,q,))
c1.daemon=True
c2.daemon=True

p1.start()
p2.start()
p3.start()
c1.start()
c2.start()

p1.join()
p2.join()
p3.join()
q.join() #等待隊列被取幹凈
# q.join() 結束意味著
# 主進程的代碼運行完畢--->(生產者運行完畢)+隊列中的數據也被取幹凈了->消費者沒有存在的意義

# print(‘主‘)

Python中的生產者消費者模型