1. 程式人生 > >Python基礎31_multiprocess模組.鎖.佇列

Python基礎31_multiprocess模組.鎖.佇列

一. multiprocess模組 仔細說來,multiprocess不是一個模組而是python中一個操作、管理程序的包。 之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和程序有關的所有子模組。由於提供的子模組非常多,為了方便大家歸類記憶,我將這部分大致分為四個部分:建立程序部分,程序同步部分,程序池部分,程序之間資料共享。重點強調:程序沒有任何共享狀態,程序修改的資料,改動僅限於該程序內,但是通過一些特殊的方法,可以實現程序之間資料的共享。 1. Process模組 Process是穿件程序的模組, 藉助這個模組, 可以實現程序的建立 Process([group [, target [, name [, args [, kwargs]]]]])由該類例項化一個物件, 表示一個子程序中的任務(尚未啟動) 強調: (1). 需要使用關鍵字的方式來指定引數 (2). args指定的為傳給target函式的位置引數, 是一個元祖形式, 必須有逗號 (1). 看一個程式例項: from multiprocessing import Process def func(): print(12345) # 當我們運行當前這個test.py檔案的時候, 就產生了程序, 這個程序我們稱之為主程序 if __name__ == '__main__': # 將函式註冊到一個程序中, p是一個程序物件, 此時還沒有啟動程序, 只是建立了一個程序物件, 並且func是不加括號的的, 因為加上括號就直接運行了 p = Process(target=func, ) # 告訴作業系統, 給我開啟一個程序, func這個函式就被我們新開的這個程序執行了, 而這個程序是我主程式創建出來的所以稱這個新建立的程序為主程序的子程序, 而主程序又可以稱之為這個新程序的父程序 # 而這個子程序中執行的程式,相當於將現在這個test.py檔案中的程式copy到一個你看不到的python檔案中去執行了,就相當於當前這個檔案,被另外一個py檔案import過去並執行了。 # start並不是直接就去執行了,我們知道程序有三個狀態,程序會進入程序的三個狀態,就緒,(被排程,也就是時間片切換到它的時候)執行,阻塞,並且在這個三個狀態之間不斷的轉換,等待cpu執行時間片到了。 p.start() # 這是主程序的程式,上面開啟的子程序的程式是和主程序的程式同時執行的,我們稱為非同步 print("*"*10) (2). 上面說了,我們通過主程序建立的子程序是非同步執行的,那麼我們就驗證一下,並且看一下子程序和主程序(也就是父程序)的ID號(講一下pid和ppid,使用pycharm舉例),來看看是否是父子關係。 import time import os # os.getpid() # 獲取自己的程序號 # os.getppid() # 獲取自己程序的父程序的ID號 from multiprocessing import Process def func(): print("aaaaa") time.sleep(1) print("子程序>>>", os.getpid()) print("該子程序的父程序>>>", os.getppid()) print(12345) if __name__ == '__main__': p = Process(target=func, ) p.start() print("*"*10) print("父程序>>>", os.getpid()) print("父程序的父程序>>>", os.getppid()) # ********** # 首先打印出來了主程序的程式,然後列印的是子程序的,也就是子程序是非同步執行的,相當於主程序和子程序同時執行著,如果是同步的話,我們先執行的是func(),然後再列印主程序最後的10個*號。 # 父程序>>> 9044 # 父程序的父程序>>> 9528 #我執行的test.py檔案的父程序號,它是pycharm的程序號 # aaaaa # 子程序>>> 10476 # 該子程序的父程序>>> 9044 #是我主程序的ID號,說明主程序為它的父程序 # 12345 (3). 看一個問題,說明linux和windows兩個不同的作業系統建立程序的不同機制導致的不同結果: import time import os from multiprocessing import Process def func(): print('aaaa') time.sleep(1) print('子程序>>',os.getpid()) print('該子程序的父程序>>',os.getppid()) print(12345) print('太白老司機') """如果我在這裡加了一個列印,你會發現執行結果中會出現兩次打印出來的太白老司機,因為我們在主程序中開了一個子程序,子程序中的程式相當於import的主程序中的程式,那麼import的時候會不會執行你import的那個檔案的程式啊,前面學的,是會執行的,所以出現了兩次列印, 其實是因為windows開起程序的機制決定的,在linux下是不存在這個效果的,因為windows使用的是process方法來開啟程序,他就會拿到主程序中的所有程式,而linux下只是去執行我子程序中註冊的那個函式,不會執行別的程式,這也是為什麼在windows執行程式的時候,要加上if __name__ == '__main__':,否則會出現子程序中執行的時候還開啟子程序,那就出現無限迴圈的建立程序了,就報錯了""" if __name__ == '__main__': p = Process(target=func, ) p.start() print("*"*10) print("父程序>>>", os.getpid()) print("父程序的父程序>>>", os.getppid()) # 太白老司機 # ********** # 父程序>>> 11204 # 父程序的父程序>>> 9528 # 太白老司機 # aaaa # 子程序>> 6644 # 該子程序的父程序>> 11204 # 12345 (4). 一個程序的生命週期:如果子程序的執行時間長,那麼等到子程序執行結束程式才結束,如果主程序的執行時間長,那麼主程序執行結束程式才結束,實際上我們在子程序中列印的內容是在主程序的執行結果中看不出來的,但是pycharm幫我們做了優化,因為它會識別到你這是開的子程序,幫你把子程序中列印的內容列印到了顯示臺上。 如果說一個主程序執行完了之後,我們把pycharm關了,但是子程序還沒有執行結束,那麼子程序還存在嗎?這要看你的程序是如何配置的,如果說我們沒有配置說我主程序結束,子程序要跟著結束,那麼主程序結束的時候,子程序是不會跟著結束的,他會自己執行完,如果我設定的是主程序結束,子程序必須跟著結束,那麼就不會出現單獨的子程序(孤兒程序)了,具體如何設定,看下面的守護程序的講解。比如說,我們將來啟動專案的時候,可能通過cmd來啟動,那麼我cmd關閉了你的專案就會關閉嗎,不會的,因為你的專案不能停止對外的服務,對吧 (5). Process類中引數的介紹: 引數介紹: ①. group引數未使用,值始終為None ②. target表示呼叫物件,即子程序要執行的任務 ③. args表示呼叫物件的位置引數元組,args=(1,2,'egon',) ④. kwargs表示呼叫物件的字典,kwargs={'name':'egon','age':18} ⑤. name為子程序的名稱 給要執行的函式傳引數: def func(x,y): print(x) time.sleep(1) print(y) if __name__ == '__main__': p = Process(target=func,args=('姑娘','來玩啊!'))#這是func需要接收的引數的傳送方式。 p.start() print('父程序執行結束!') #執行結果: 父程序執行結束! 姑娘 來玩啊! (6). Process類中各方法的介紹: ①. p.start():啟動程序,並呼叫該子程序中的p.run() ②. p.run():程序啟動時執行的方法,正是它去呼叫target指定的函式,我們自定義類的類中一定要實現該方法 ③. p.terminate():強制終止程序p,不會進行任何清理操作,如果p建立了子程序,該子程序就成了殭屍程序,使用該方法需要特別小心這種情況。如果p還儲存了一個鎖那麼也將不會被釋放,進而導致死鎖 ④. p.is_alive():如果p仍然執行,返回True ⑤. p.join([timeout]):主執行緒等待p終止(強調:是主執行緒處於等的狀態,而p是處於執行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的程序,而不能join住run開啟的程序 jion方法的例子: 讓主程序加上join的地方等待(也就是阻塞住), 等待子程式執行完之後, 在繼續往下執行主程序, 很多時候, 我們主程序需要子程序的執行結果, 所以必須要等待, join有點像把子程式和主程式拼接起來, 將非同步改為同步執行 import time from multiprocessing import Process def func(x, y): print(x) time.sleep(1) print(y) if __name__ == "__main__": p = Process(target=func, args=("姑娘", "來玩啊")) p.start() print("這裡是非同步的") p.join() print("父程式執行結束") # 列印結果 這裡是非同步的 姑娘 來玩啊 父程式執行結束 用for迴圈開啟多個執行緒: import time import os from multiprocessing import Process def func(x, y): print(x) # time.sleep(1) print(y) if __name__ == '__main__': p_list = [] for i in range(10): p = Process(target=func, args=("姑娘%s"%i, "來玩啊")) p_list.append(p) p.start() # 1、如果加到for迴圈裡面,那麼所有子程序包括父程序就全部變為同步了,因為for迴圈也是主程序的,迴圈第一次的時候,一個程序去執行了,然後這個程序就join住了,那麼for迴圈就不會繼續執行了,等著第一個子程序執行結束才會繼續執行for迴圈去建立第二個子程序。 # 2、如果我不想這樣的,也就是我想所有的子程序是非同步的,然後所有的子程序執行完了再執行主程序 p.join() # 4、這是解決辦法,前提是我們的子程序全部都已經去執行了,那麼我在一次給所有正在執行的子程序加上join,那麼主程序就需要等著所有子程序執行結束才會繼續執行自己的程式了,並且保障了所有子程序是非同步執行的。 [ap.join() for ap in p_list] # 3、如果這樣寫的話,多次執行之後,你會發現會出現主程序的程式比一些子程序先執行完,因為我們p.join()是對最後一個子程序進行了join,也就是說如果這最後一個子程序先於其他子程序執行完,那麼主程序就會去執行,而此時如果還有一些子程序沒有執行完,而主程序執行完了,那麼就會先列印主程序的內容了,這個cpu排程程序的機制有關係,因為我們的電腦可能只有4個cpu,我的子程序加上住程序有11個,雖然我for迴圈是按順序起程序的,但是作業系統一定會按照順序給你執行你的程序嗎,答案是不會的,作業系統會按照自己的演算法來分配程序給cpu去執行,這裡也解釋了我們打印出來的子程序中的內容也是沒有固定順序的原因,因為列印結果也需要呼叫cpu,可以理解成程序在爭搶cpu,如果同學你想問這是什麼演算法,這就要去研究作業系統啦。那我們的想所有子程序非同步執行,然後再執行主程序的這個需求怎麼解決啊 p.join() print("不要錢") 模擬兩個應用場景, 1. 同時對一個檔案進行寫操作, 2. 同時建立多個檔案 import time import os import re from multiprocessing import Process # 多程序同時對一個檔案進行寫操作 # def func(x, y, i): # with open(x, "a", encoding = "utf-8") as f: # print("當前程序%s拿到的檔案的游標位置>>%s" % (os.getpid(), f.tell())) # f.write("%s\n"%y) # 多執行緒同時建立多個檔案 def func(x, y): with open(x, "w", encoding="utf-8") as f: f.write(y) if __name__ == '__main__': p_list = [] for i in range(10): # p = Process(target=func, args=("can_do_girl_lists.txt", "姑娘%s"%(i+1), i+1)) p = Process(target=func, args=("girl/can_do_girl_lists_%s.txt"%(i+1), "姑娘%s"%(i+1))) p_list.append(p) p.start() [ap.join() for ap in p_list] # with open("can_do_girl_lists.txt", "r", encoding="utf-8") as f: # data = f.read() # all_num = re.findall("\d+", data) # print(">>>>>", all_num, ".....%s"%(len(all_num))) # print([i for i in os.walk(r"D:/1PY/Day30")]) print("不要錢") (7). Process類中自帶封裝的各屬性的介紹 ①. p.daemon:預設值為False,如果設為True,代表p為後臺執行的守護程序,當p的父程序終止時,p也隨之終止,並且設定為True後,p不能建立自己的新程序,必須在p.start()之前設定 ②. p.name:程序的名稱 ③. p.pid:程序的pid ④. p.exitcode:程序在執行時為None、如果為–N,表示被訊號N結束(瞭解即可) ⑤. p.authkey:程序的身份驗證鍵,預設是由os.urandom()隨機生成的32字元的字串。這個鍵的用途是為涉及網路連線的底層程序間通訊提供安全性,這類連線只有在具有相同的身份驗證鍵時才能成功(瞭解即可) 2. Process類的使用 注意: 在windows中Process()必須放到 if __name__ == "__main__": 下 由於Windows沒有fork,多處理模組啟動一個新的Python程序並匯入呼叫模組。 如果在匯入時呼叫Process(),那麼這將啟動無限繼承的新程序(或直到機器耗盡資源)。 這是隱藏對Process()內部呼叫的原,使用if __name__ == “__main __”,這個if語句中的語句將不會在匯入時被呼叫。 程序建立的第二種方法(繼承): (1). 程序建立的第二種方法: import os from multiprocessing import Process class MyProcess(Process): def __init__(self, person): super().__init__() # 必須執行父類的__init__方法 self.person = person def run(self): print(os.getpid()) print(self.pid) print("%s正在和女主播聊天" % self.person) # def start(self): # self.run() # print("我是%s" % self.person) if __name__ == '__main__': p1 = MyProcess("Jedan") p2 = MyProcess("太白") p3 = MyProcess("alexDSB") p1.start() p2.start() p2.run() p3.start() p1.join() p2.join() p3.join() (2). 程序之間的資料是隔離的 from multiprocessing import Process n = 100 def work(): global n n = 0 print("子程序內:", n) # print(n) if __name__ == '__main__': p = Process(target=work) p.start() p.join() #等待子程序執行完畢,如果資料共享的話,我子程序是不是通過global將n改為0了,但是你看列印結果,主程序在子程序執行結束之後,仍然是n=100,子程序n=0,說明子程序對n的修改沒有在主程序中生效,說明什麼?說明他們之間的資料是隔離的,互相不影響的 print("主程序內:", n) # 子程序內: 0 # 主程序內: 100 (3). 多執行緒實現多個客戶端通訊 (4). is_alive(), terminate() import time from multiprocessing import Process class Piao(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print("%s is 打飛機" % self.name) s = input("!!!") # 在pycharm下子程序中不能input輸入, 會報錯 EOFError: EOF when reading a line, 因為子程序中沒有像我們主程序這樣的在pycharm下的控制檯可以輸入東西的地方 time.sleep(2) print("%s is 打飛機結束" % self.name) if __name__ == '__main__': p1 = Piao("太白") p1.start() p1.join() # time.sleep(5) p1.terminate() # 關閉程序, 不會立即關閉, 有個等著作業系統去關閉這個程序的時間, 所以is_alive立刻檢視的結果可能還是存活, 但是稍微等一會就關閉了 print(p1.is_alive()) # 檢視子程式是否還存活 print("等會...") time.sleep(1) print(p1.is_alive()) (5). 殭屍程序(有害)和孤兒程序(無害) 殭屍程序: 一個程序使用fork建立子程序, 如果子程式退出, 而父程式沒有呼叫wait或waitpid獲取子程序的狀態訊息, 那麼子程序的程序描述符任然儲存在系統中, 這個程序稱之為殭屍程序 任何一個子程序(init除外)在exit()之後,並非馬上就消失掉,而是留下一個稱為殭屍程序(Zombie)的資料結構,等待父程序處理。這是每個子程序在結束時都要經過的階段。如果子程序在exit()之後,父程序沒有來得及處理,這時用ps命令就能看到子程序的狀態是“Z”。如果父程序能及時 處理,可能用ps命令就來不及看到子程序的殭屍狀態,但這並不等於子程序不經過殭屍狀態。 如果父程序在子程序結束之前退出,則子程序將由init接管。init將會以父程序的身份對殭屍狀態的子程序進行處理。 孤兒程序: 一個父程序退出, 而它的一個或多個子程序還在執行, 那麼那些子程序將會成為孤兒程序, 孤兒程序將會被init程序(程序號為1)所收養, 並由init程序對他們完成狀態收集工作 殭屍程序的危害場景: 例如有個程序,它定期的產 生一個子程序,這個子程序需要做的事情很少,做完它該做的事情之後就退出了,因此這個子程序的生命週期很短,但是,父程序只管生成新的子程序,至於子程序 退出之後的事情,則一概不聞不問,這樣,系統執行上一段時間之後,系統中就會存在很多的僵死程序,倘若用ps命令檢視的話,就會看到很多狀態為Z的程序。 嚴格地來說,僵死程序並不是問題的根源,罪魁禍首是產生出大量僵死程序的那個父程序。因此,當我們尋求如何消滅系統中大量的僵死程序時,答案就是把產生大 量僵死程序的那個元凶槍斃掉(也就是通過kill傳送SIGTERM或者SIGKILL訊號啦)。槍斃了元凶程序之後,它產生的僵死程序就變成了孤兒進 程,這些孤兒程序會被init程序接管,init程序會wait()這些孤兒程序,釋放它們佔用的系統程序表中的資源,這樣,這些已經僵死的孤兒程序 就能瞑目而去了。 3. 守護程序 如果主程序結束了, 由主程序建立的子程序必須跟著結束, 這時就需要守護程序 主程序建立守護程序: 其一. 守護程序會在主程序程式碼執行結束後就終止 其二. 守護程序內無法再開啟子程序, 否則會丟擲異常AssertionError: daemonic processes are not allowed to have children 注意: 程序之間是相互獨立的, 主程序程式碼執行結束, 守護程序隨即終止 import os, time from multiprocessing import Process class MyProcess(Process): def __init__(self, person): super().__init__() self.name = name deg run(self): print(os.getpid(), self.name) print("%s正在和女主播聊天" % self.name) time.sleep(3) if __name__ == "__mian__": p = MyProcess("太白") p.daemon = True # 一定要在p.start()之前設定p為守護程序, 禁止p建立子程序, 並且父程序程式碼執行結束, p即終止執行 p.start() print("寶") 4. 程序同步(鎖) 利用併發程式設計可以更加充分的利用io資源, 但也帶來了新的問題: 程序之間的資料不共享, 但是共享同一套檔案系統, 所以訪問同一個檔案或者同一個列印終端, 是沒有問題的, 而共享帶來的是競爭, 競爭帶來的是錯亂, 如何控制, 就是枷鎖 (1). 多程序搶佔輸出資源, 導致列印混亂的示例: import os, time, random from multiprocessing import Process def work(n): print("%s:%s is running" % (n, os.getpid())) time.sleep(random.randint(1,3)) print("%s:%s is done" % (n, os.getpid())) if __name__ == '__main__': for i in range(5): p = Process(target = work, args = (i,)) p.start() # 3:6716 is running # 4:10220 is running # 0:5524 is running # 1:3164 is running # 2:10036 is running # 3:6716 is done # 4:10220 is done # 1:3164 is done # 0:5524 is done # 2:10036 is done 兩個問題: 一. 每個程序中work函式的第一個列印就不是按照我們for迴圈的順序來列印的 二. 每個work都要兩個列印, 但時第一個列印的順序是3-4-0-1-2, 而第二個列印的順序是3-4-1-0-2, 說明我們一個程序中的程式順序都亂了 第二個問題可以通過枷鎖來解決, 第一個問題是沒法解決的, 因為程序開到了核心, 由作業系統來決定程序的排程, 無法控制 (2). 加鎖, 由併發改為了序列 import os, time from multiprocessing import Process, Lock def work(n, l): # 加鎖, 保證每一次只有一個程序在執行鎖裡面的程式, 這一段程式對於所有寫上這個鎖的程序, 大家都變成了序列 lock.acquire() print("%s:%s is running" % (n, os.getpid())) time.sleep(1) print("%s:%s is done" % (n, os.getpid())) # 解鎖, 解鎖之後其他程序才能去執行自己的程式 lock.release() if __name_ == "__main__": lock = Lock() for i in range(5): p = Process(target=work, args=(i, l)) p.start() # 2:4032 is running # 2:4032 is done # 0:8444 is running # 0:8444 is done # 4:2872 is running # 4:2872 is done # 3:7480 is running # 3:7480 is done # 1:5196 is running # 1:5196 is done 結果分析:(自己去多次執行一下,看看結果,我拿出其中一個結果來看)通過結果我們可以看出,多程序剛開始去執行的時候,每次執行,首先打印出來哪個程序的程式是不固定的,但是我們解決了上面列印混亂示例程式碼的第二個問題,那就是同一個程序中的兩次列印都是先完成的,然後才切換到下一個程序去,列印下一個程序中的兩個列印結果,說明我們控制住了同一程序中的程式碼執行順序,如果涉及到多個程序去操作同一個資料或者檔案的時候,就不擔心資料算錯或者檔案中的內容寫入混亂了。 上面這種情況雖然使用加鎖的形式實現了順序的執行, 但是程式又重新變成串行了, 這樣確實會浪費了時間, 但是卻保證了資料的安全 (3). 模擬搶票 import json, time from multiprocessing import Process, Lock def check(n): ticket_dic = json.load(open("ticketinfo.json", "r", encoding="utf-8")) print(ticket_dic) print("%s查看了餘票數, 尚有餘票%s張" % (n, ticket_dic["餘票"])) def buy(n): ticket_dic = json.load(open("ticketinfo.json", "r", encoding="utf-8")) if ticket_dic["餘票"]>0: time.sleep(1) ticket_dic["餘票"] -= 1 json.dump(ticket_dic, open("ticketinfo.json", "w", encoding="utf-8"), ensure_ascii=False) print("%s購票成功" % n) else: print("沒票了") def task(n, lock): # def task(n): check(n) lock.acquire() buy(n) lock.release() if __name__ == '__main__': lock = Lock() for i in range(5): p = Process(target=task, args=(i, lock)) # p = Process(target=task, args=(i, )) p.start() # {'餘票': 2} # 1查看了餘票數, 尚有餘票2張 # {'餘票': 2} # 2查看了餘票數, 尚有餘票2張 # {'餘票': 2} # 0查看了餘票數, 尚有餘票2張 # {'餘票': 2} # 3查看了餘票數, 尚有餘票2張 # {'餘票': 2} # 4查看了餘票數, 尚有餘票2張 # 1購票成功 # 2購票成功 # 沒票了 # 沒票了 # 沒票了 程序鎖總結: 加鎖可以保證多個程序修改同一塊資料時, 同一時間只能有一個任務可以進行修改, 即序列的修改, 沒錯, 速度是慢了, 但保證了資料安全. 雖然可以用檔案共享資料實現程序間通訊, 但問題是: 1效率低(共享資料基於檔案, 而檔案是硬碟上的資料). 2需要自己加鎖處理 因此我們最好找一種解決方案能夠兼顧: 1效率高(多個程序共享一塊記憶體的資料) 2幫我們處理好問題, 這就是multiprocessing模組為我們提供的基於訊息的IPC通訊機制: 佇列和管道 佇列和管道都是將資料存放於記憶體中 佇列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來, 我們應該儘量避免使用共享資料,儘可能使用訊息傳遞和佇列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。 IPC通訊機制(瞭解):IPC是intent-Process Communication的縮寫,含義為程序間通訊或者跨程序通訊,是指兩個程序之間進行資料交換的過程。IPC不是某個系統所獨有的,任何一個作業系統都需要有相應的IPC機制, 比如Windows上可以通過剪貼簿、管道和郵槽等來進行程序間通訊,而Linux上可以通過命名共享內容、訊號量等來進行程序間通訊。Android它也有自己的程序間通訊方式,Android建構在Linux基礎上,繼承了一 部分Linux的通訊方式。 5. 佇列 程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:佇列和管道,這兩種方式都是使用訊息傳遞的。佇列就像一個特殊的列表,但是可以設定固定長度,並且從前面插入資料,從後面取出資料,先進先出。 Queue([maxsize]) 建立共享的程序佇列。 引數 :maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。 底層佇列使用管道和鎖實現。 (1). 方法介紹 q = Queue([maxsize]) 建立共享的程序佇列。maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。底層佇列使用管道和鎖定實現。另外,還需要執行支援執行緒以便佇列中的資料傳輸到底層管道中。 Queue的例項q具有以下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一個專案。如果q為空,此方法將阻塞,直到佇列中有專案可用為止。block用於控制阻塞行為,預設為True. 如果設定為False,將引發Queue.Empty異常(定義在Queue模組中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有專案變為可用,將引發Queue.Empty異常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 將item放入佇列。如果佇列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,預設為True。如果設定為False,將引發Queue.Full 異常(定義在Queue庫模組中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。 q.qsize() 返回佇列中目前專案的正確數量。此函式的結果並不可靠,因為在返回結果和在稍後程式中使用結果之間,佇列中可能新增或刪除了專案。在某些系統上,此方法可能引發NotImplementedError異常。 q.empty() 如果呼叫此方法時 q為空,返回True。如果其他程序或執行緒正在往佇列中新增專案,結果是不可靠的。也就是說,在返回和使用結果之間,佇列中可能已經加入新的專案 q.full() 如果q已滿,返回為True. 由於執行緒的存在,結果也可能是不可靠的(參考q.empty()方法)。。 q.close() 關閉佇列,防止佇列中加入更多資料。呼叫此方法時,後臺執行緒將繼續寫入那些已入佇列但尚未寫入的資料,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動呼叫此方法。關閉佇列不會在佇列使用者中生成任何型別的資料結束訊號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的佇列不會導致get()方法返回錯誤。 q.cancel_join_thread() 不會再程序退出時自動連線後臺執行緒。這可以防止join_thread()方法阻塞。 q.join_thread() 連線佇列的後臺執行緒。此方法用於在呼叫q.close()方法後,等待所有佇列項被消耗。預設情況下,此方法由不是q的原始建立者的所有程序呼叫。呼叫q.cancel_join_thread()方法可以禁止這種行為。 (2). 佇列是程序安全的, 同一時間只能一個程序拿到佇列中的資料 例子: 批量生產輸入放入佇列, 再批量的獲取結果 import os, time import multiprocessing # 向queue中輸入資料的函式 def inputQ(queue): info = str(os.getpid()) + "(put):" + str(time.asctime()) queue.put(info) # 向queue中輸出資料的函式 def outputQ(queue): info = queue.get() print("%s%s %s" % (str(os.getpid()), "(get):", info)) if __name__ == '__main__': # windows下, 如果開啟程序較多的話, 程式會崩潰, 為了防止這個問題, 使用freeze_support()方法來解決 multiprocessing.freeze_support() record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3) # 輸入程序 for i in range(10): process = multiprocessing.Process(target=inputQ, args=(queue,)) time.sleep(0.2) process.start() record1.append(process) # 輸出程序 for i in range(10): process = multiprocessing.Process(target=outputQ, args=(queue,)) process.start() record2.append(process) # for p in record1: # p.join() [pp.join() for pp in record1] # for p in record2: # p.join() [pp.join() for pp in record2] # 6248(get): 200(put):Wed Jan 9 23:05:38 2019 # 1568(get): 11692(put):Wed Jan 9 23:05:38 2019 # 9292(get): 6736(put):Wed Jan 9 23:05:38 2019 # 3452(get): 12136(put):Wed Jan 9 23:05:38 2019 # 6676(get): 3400(put):Wed Jan 9 23:05:39 2019 # 372(get): 2904(put):Wed Jan 9 23:05:39 2019 # 1396(get): 6352(put):Wed Jan 9 23:05:39 2019 # 1532(get): 4156(put):Wed Jan 9 23:05:39 2019 # 6868(get): 9528(put):Wed Jan 9 23:05:40 2019 # 10832(get): 8336(put):Wed Jan 9 23:05:41 2019 (3). 生產者消費者模型 在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。 為什麼要使用生產者和消費者模式 線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。 什麼是生產者消費者模式 生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力,並且我可以根據生產速度和消費速度來均衡一下多少個生產者可以為多少個消費者提供足夠的服務,就可以開多程序等等,而這些程序都是到阻塞佇列或者說是緩衝區中去獲取或者新增資料。 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print('主') (3). 生產者消費者模型總結 #程式中有兩類角色 一類負責生產資料(生產者) 一類負責處理資料(消費者) #引入生產者消費者模型為了解決的問題是: 平衡生產者與消費者之間的工作能力,從而提高程式整體處理資料的速度 #如何實現: 生產者<-->佇列<——>消費者 #生產者消費者模型實現類程式的解耦和 (4). 通過上面基於佇列的生產者消費者程式碼示例,我們發現一個問題:主程序永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死迴圈中且卡在q.get()這一步。 解決方式無非是讓生產者在生產完畢後,往佇列中再發一個結束訊號,這樣消費者在接收到結束訊號後就可以break出死迴圈 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束訊號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(5): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.put(None) #在自己的子程序的最後加入一個結束訊號 if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print('主') 注意:結束訊號None,不一定要由生產者發,主程序裡同樣可以發,但主程序需要等生產者結束後才應該傳送該訊號 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束訊號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() p1.join() #等待生產者程序結束 q.put(None) #傳送結束訊號 print('主') (5). 但上述解決方式,在有多個生產者和多個消費者時,由於佇列我們說了是程序安全的,我一個程序拿走了結束訊號,另外一個程序就拿不到了,還需要多傳送一個結束訊號,有幾個取資料的程序就要傳送幾個結束訊號,我們則需要用一個很low的方式去解決 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束訊號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(name,q): for i in range(2): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) #開始 p1.start() p2.start() p3.start() c1.start() p1.join() #必須保證生產者全部生產完畢,才應該傳送結束訊號 p2.join() p3.join() q.put(None) #有幾個消費者就應該傳送幾次結束訊號None q.put(None) #傳送結束訊號 print('主') (6). 其實我們的思路無非是傳送結束訊號而已,有另外一種佇列提供了這種機制 #JoinableQueue([maxsize]):這就像是一個Queue物件,但佇列允許專案的使用者通知生成者專案已經被成功處理。通知程序是使用共享的訊號和條件變數來實現的。 #引數介紹: maxsize是佇列中允許最大項數,省略則無大小限制。   #方法介紹: JoinableQueue的例項p除了與Queue物件相同的方法之外還具有: q.task_done():使用者使用此方法發出訊號,表示q.get()的返回專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除專案的數量,將引發ValueError異常 q.join():生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理。阻塞將持續到佇列中的每個專案均呼叫q.task_done()方法為止,也就是佇列中的資料全部被get拿走了。 from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() # time.sleep(random.randint(1,3)) time.sleep(random.random()) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) q.task_done() #向q.join()傳送一次訊號,證明一個數據已經被取走並執行完了 def producer(name,q): for i in range(10): # time.sleep(random.randint(1,3)) time.sleep(random.random()) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) print('%s生產結束'%name) q.join() #生產完畢,使用此方法進行阻塞,直到佇列中所有專案均被處理。 print('%s生產結束~~~~~~'%name) if __name__ == '__main__': q=JoinableQueue() #生產者們:即廚師們 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True #如果不加守護,那麼主程序結束不了,但是加了守護之後,必須確保生產者的內容生產完並且被處理完了,所有必須還要在主程序給生產者設定join,才能確保生產者生產的任務被執行完了,並且能夠確保守護程序在所有任務執行完成之後才隨著主程序的結束而結束。 c2.daemon=True #開始 p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() #我要確保你的生產者程序結束了,生產者程序的結束標誌著你生產的所有的人任務都已經被處理完了 p2.join() p3.join() print('主') # 主程序等--->p1,p2,p3等---->c1,c2 # p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到佇列的資料 # 因而c1,c2也沒有存在的價值了,不需要繼續阻塞在程序中影響主程序了。應該隨著主程序的結束而結束,所以設定成守護程序就可以了。