1. 程式人生 > >python並發編程之多進程(二):互斥鎖(同步鎖)&進程其他屬性&進程間通信(queue)&生產者消費者模型

python並發編程之多進程(二):互斥鎖(同步鎖)&進程其他屬性&進程間通信(queue)&生產者消費者模型

互斥 數據 socket pan copy src too 如果 搶票

一,互斥鎖,同步鎖

進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,

競爭帶來的結果就是錯亂,如何控制,就是加鎖處理

part1:多個進程共享同一打印終端

技術分享圖片技術分享圖片 技術分享圖片
#並發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
from multiprocessing import Process
import os,time
def work():
    print(‘%s is running‘ %os.getpid())
    time.sleep(2)
    print(‘%s is done‘ %os.getpid())

if __name__ == ‘__main__‘:
    for i in range(3):
        p=Process(target=work)
        p.start()
技術分享圖片 並發運行,效率高,但競爭同一打印終端,帶來了打印錯亂 技術分享圖片技術分享圖片 技術分享圖片
#由並發變成了串行,犧牲了運行效率,但避免了競爭
from multiprocessing import Process,Lock
import os,time
def work(lock):
    lock.acquire()
    print(‘%s is running‘ %os.getpid())
    time.sleep(2)
    print(‘%s is done‘ %os.getpid())
    lock.release()
if __name__ == ‘__main__‘:
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,))
        p.start()
技術分享圖片 加鎖:由並發變成了串行,犧牲了運行效率,但避免了競爭

part2:多個進程共享同一文件

文件當數據庫,模擬搶票

技術分享圖片技術分享圖片 技術分享圖片
#文件db的內容為:{"count":1}
#註意一定要用雙引號,不然json無法識別
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open(‘db.txt‘))
    print(‘\033[43m剩余票數%s\033[0m‘ %dic[‘count‘])

def get():
    dic=json.load(open(‘db.txt‘))
    time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic[‘count‘] >0:
        dic[‘count‘]-=1
        time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open(‘db.txt‘,‘w‘))
        print(‘\033[43m購票成功\033[0m‘)

def task(lock):
    search()
    get()
if __name__ == ‘__main__‘:
    lock=Lock()
    for i in range(100): #模擬並發100個客戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()
技術分享圖片 並發運行,效率高,但競爭寫同一文件,數據寫入錯亂 技術分享圖片技術分享圖片 技術分享圖片
#文件db的內容為:{"count":1}
#註意一定要用雙引號,不然json無法識別
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open(‘db.txt‘))
    print(‘\033[43m剩余票數%s\033[0m‘ %dic[‘count‘])

def get():
    dic=json.load(open(‘db.txt‘))
    time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic[‘count‘] >0:
        dic[‘count‘]-=1
        time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open(‘db.txt‘,‘w‘))
        print(‘\033[43m購票成功\033[0m‘)

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()
if __name__ == ‘__main__‘:
    lock=Lock()
    for i in range(100): #模擬並發100個客戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()
技術分享圖片 加鎖:購票行為由並發變成了串行,犧牲了運行效率,但保證了數據安全

總結:

加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1.效率低
2.需要自己加鎖處理

為此mutiprocessing模塊為我們提供了基於消息的IPC通信機制:隊列和管道。
1 隊列和管道都是將數據存放於內存中
2 隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,
我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。

二,進程的其他屬性

註意:在windows中Process()必須放到# if __name__ == ‘__main__‘:下

技術分享圖片技術分享圖片
Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 
If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 
This is the reason for hiding calls to Process() inside

if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.
由於Windows沒有fork,多處理模塊啟動一個新的Python進程並導入調用模塊。 
如果在導入時調用Process(),那麽這將啟動無限繼承的新進程(或直到機器耗盡資源)。 
這是隱藏對Process()內部調用的原,使用if __name__ == “__main __”,這個if語句中的語句將不會在導入時被調用。
詳細解釋

創建並開啟子進程的兩種方式

技術分享圖片技術分享圖片
#開進程的方法一:
import time
import random
from multiprocessing import Process
def piao(name):
    print(‘%s piaoing‘ %name)
    time.sleep(random.randrange(1,5))
    print(‘%s piao end‘ %name)



p1=Process(target=piao,args=(‘egon‘,)) #必須加,號
p2=Process(target=piao,args=(‘alex‘,))
p3=Process(target=piao,args=(‘wupeqi‘,))
p4=Process(target=piao,args=(‘yuanhao‘,))

p1.start()
p2.start()
p3.start()
p4.start()
print(‘主線程‘)
方法一 技術分享圖片技術分享圖片
#開進程的方法二:
import time
import random
from multiprocessing import Process


class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(‘%s piaoing‘ %self.name)

        time.sleep(random.randrange(1,5))
        print(‘%s piao end‘ %self.name)

p1=Piao(‘egon‘)
p2=Piao(‘alex‘)
p3=Piao(‘wupeiqi‘)
p4=Piao(‘yuanhao‘)

p1.start() #start會自動調用run
p2.start()
p3.start()
p4.start()
print(‘主線程‘)
方法二

練習1:把上周所學的socket通信變成並發的形式

技術分享圖片技術分享圖片
from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind((‘127.0.0.1‘,8080))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == ‘__main__‘: #windows下start進程一定要寫到這下面
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()
server端 技術分享圖片技術分享圖片
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect((‘127.0.0.1‘,8080))


while True:
    msg=input(‘>>: ‘).strip()
    if not msg:continue

    client.send(msg.encode(‘utf-8‘))
    msg=client.recv(1024)
    print(msg.decode(‘utf-8‘))
多個client端 技術分享圖片技術分享圖片
每來一個客戶端,都在服務端開啟一個進程,如果並發來一個萬個客戶端,要開啟一萬個進程嗎,你自己嘗試著在你自己的機器上開啟一萬個,10萬個進程試一試。
解決方法:進程池
這麽實現有沒有問題???

Process對象的join方法

技術分享圖片技術分享圖片
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print(‘%s is piaoing‘ %self.name)
        time.sleep(random.randrange(1,3))
        print(‘%s is piao end‘ %self.name)


p=Piao(‘egon‘)
p.start()
p.join(0.0001) #等待p停止,等0.0001秒就不再等了
print(‘開始‘)
join:主進程等,等待子進程結束 技術分享圖片技術分享圖片
from multiprocessing import Process
import time
import random
def piao(name):
    print(‘%s is piaoing‘ %name)
    time.sleep(random.randint(1,3))
    print(‘%s is piao end‘ %name)

p1=Process(target=piao,args=(‘egon‘,))
p2=Process(target=piao,args=(‘alex‘,))
p3=Process(target=piao,args=(‘yuanhao‘,))
p4=Process(target=piao,args=(‘wupeiqi‘,))

p1.start()
p2.start()
p3.start()
p4.start()

#有的同學會有疑問:既然join是等待進程結束,那麽我像下面這樣寫,進程不就又變成串行的了嗎?
#當然不是了,必須明確:p.join()是讓誰等?
#很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p,

#詳細解析如下:
#進程只要start就會在開始運行了,所以p1-p4.start()時,系統中已經有四個並發的進程了
#而我們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵
#join是讓主線程等,而p1-p4仍然是並發執行的,p1.join的時候,其余p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接通過檢測,無需等待
# 所以4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間
p1.join()
p2.join()
p3.join()
p4.join()

print(‘主線程‘)


#上述啟動進程與join進程可以簡寫為
# p_l=[p1,p2,p3,p4]
# 
# for p in p_l:
#     p.start()
# 
# for p in p_l:
#     p.join()
有了join,程序不就是串行了嗎???

Process對象的其他方法或屬性(了解)

技術分享圖片技術分享圖片
#進程對象的其他方法一:terminate,is_alive
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()

    def run(self):
        print(‘%s is piaoing‘ %self.name)
        time.sleep(random.randrange(1,5))
        print(‘%s is piao end‘ %self.name)


p1=Piao(‘egon1‘)
p1.start()

p1.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活
print(p1.is_alive()) #結果為True

print(‘開始‘)
print(p1.is_alive()) #結果為False
terminate與is_alive 技術分享圖片技術分享圖片
from multiprocessing import Process
import time
import random
class Piao(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法會執行self.name=Piao-1,
        #                    #所以加到這裏,會覆蓋我們的self.name=name

        #為我們開啟的進程設置名字的做法
        super().__init__()
        self.name=name

    def run(self):
        print(‘%s is piaoing‘ %self.name)
        time.sleep(random.randrange(1,3))
        print(‘%s is piao end‘ %self.name)

p=Piao(‘egon‘)
p.start()
print(‘開始‘)
print(p.pid) #查看pid
name與pid

三,隊列queue

進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

 創建隊列的類(底層就是以管道和鎖定的方式實現)

1 Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。 

參數介紹:

1 maxsize是隊列中允許最大項數,省略則無大小限制。    

  方法介紹:

主要方法: 技術分享圖片 技術分享圖片
1 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
2 q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那麽在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
3  
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6 
7 q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。
8 q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。
9 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
技術分享圖片 技術分享圖片 其他方法(了解):
1 q.cancel_join_thread():不會在進程退出時自動連接後臺線程。可以防止join_thread()方法阻塞
2 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
3 q.join_thread():連接隊列的後臺線程。此方法用於在調用q.close()方法之後,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為

 應用:

技術分享圖片技術分享圖片 技術分享圖片
‘‘‘
multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,但是隊列接口
‘‘‘

from multiprocessing import Process,Queue
import time
q=Queue(3)


#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
print(q.full()) #滿了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
技術分享圖片 View Code

四,生產者消費者

進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

 創建隊列的類(底層就是以管道和鎖定的方式實現)

1 Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。 

參數介紹:

1 maxsize是隊列中允許最大項數,省略則無大小限制。    

  方法介紹:

主要方法: 技術分享圖片 技術分享圖片
1 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
2 q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那麽在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
3  
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6 
7 q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。
8 q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。
9 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
技術分享圖片 技術分享圖片 其他方法(了解):
1 q.cancel_join_thread():不會在進程退出時自動連接後臺線程。可以防止join_thread()方法阻塞
2 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
3 q.join_thread():連接隊列的後臺線程。此方法用於在調用q.close()方法之後,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為

 應用:

技術分享圖片技術分享圖片 技術分享圖片
‘‘‘
multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,但是隊列接口
‘‘‘

from multiprocessing import Process,Queue
import time
q=Queue(3)


#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
print(q.full()) #滿了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
技術分享圖片 View Code

生產者消費者模型

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什麽要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麽是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型

技術分享圖片技術分享圖片
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print(‘\033[45m%s 吃 %s\033[0m‘ %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res=‘包子%s‘ %i
        q.put(res)
        print(‘\033[44m%s 生產了 %s\033[0m‘ %(os.getpid(),res))

if __name__ == ‘__main__‘:
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print(‘主‘)
View Code

此時的問題是主進程永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就可以break出死循環

技術分享圖片技術分享圖片
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print(‘\033[45m%s 吃 %s\033[0m‘ %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res=‘包子%s‘ %i
        q.put(res)
        print(‘\033[44m%s 生產了 %s\033[0m‘ %(os.getpid(),res))
    q.put(None) #發送結束信號
if __name__ == ‘__main__‘:
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print(‘主‘)
生產者在生產完畢後發送結束信號None

註意:結束信號None,不一定要由生產者發,主進程裏同樣可以發,但主進程需要等生產者結束後才應該發送該信號

技術分享圖片技術分享圖片
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print(‘\033[45m%s 吃 %s\033[0m‘ %(os.getpid(),res))

def producer(q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res=‘包子%s‘ %i
        q.put(res)
        print(‘\033[44m%s 生產了 %s\033[0m‘ %(os.getpid(),res))

if __name__ == ‘__main__‘:
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()

    p1.join()
    q.put(None) #發送結束信號
    print(‘主‘)
主進程在生產者生產完畢後發送結束信號None

但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決

技術分享圖片技術分享圖片
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print(‘\033[45m%s 吃 %s\033[0m‘ %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res=‘%s%s‘ %(name,i)
        q.put(res)
        print(‘\033[44m%s 生產了 %s\033[0m‘ %(os.getpid(),res))



if __name__ == ‘__main__‘:
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(‘包子‘,q))
    p2=Process(target=producer,args=(‘骨頭‘,q))
    p3=Process(target=producer,args=(‘泔水‘,q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必須保證生產者全部生產完畢,才應該發送結束信號
    p2.join()
    p3.join()
    q.put(None) #有幾個生產者就應該發送幾次結束信號None
    q.put(None) #發送結束信號
    q.put(None) #發送結束信號
    print(‘主‘)
有幾個生產者就需要發送幾次結束信號:相當low

其實我們的思路無非是發送結束信號而已,有另外一種隊列提供了這種機制

技術分享圖片
   #JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

   #參數介紹:
    maxsize是隊列中允許最大項數,省略則無大小限制。    
  #方法介紹:
    JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:
    q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常
    q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
技術分享圖片 技術分享圖片技術分享圖片
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print(‘\033[45m%s 吃 %s\033[0m‘ %(os.getpid(),res))

        q.task_done() #向q.join()發送一次信號,證明一個數據已經被取走了

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res=‘%s%s‘ %(name,i)
        q.put(res)
        print(‘\033[44m%s 生產了 %s\033[0m‘ %(os.getpid(),res))
    q.join()


if __name__ == ‘__main__‘:
    q=JoinableQueue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(‘包子‘,q))
    p2=Process(target=producer,args=(‘骨頭‘,q))
    p3=Process(target=producer,args=(‘泔水‘,q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True
    c2.daemon=True

    #開始
    p_l=[p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print(‘主‘) 
    
    #主進程等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據
    #因而c1,c2也沒有存在的價值了,應該隨著主進程的結束而結束,所以設置成守護進程
View

python並發編程之多進程(二):互斥鎖(同步鎖)&進程其他屬性&進程間通信(queue)&生產者消費者模型