Pipe(管道),Event(事件),Semaphore(訊號量),Pool(程序池),回撥函式
一、關於Pipe(管道,佇列就是基於管道),不常用,因為管道中的內容是共享的,資料不安全,而且一個數據取走後,其他人沒法接收.
from multiprocessing import Process,Pipe def f1(conn): from_zhujincheng = conn.recv() print('我是子程序') print('來自主程序的訊息:',from_zhujincheng) if __name__ == '__main__': conn1,conn2 = Pipe() #建立一個管道物件,全雙工,返回管道的兩端,但是一端傳送的訊息,只能另外一端接收,自己這一端是不能接收的#可以將一端或者兩端傳送給其他的程序,那麼多個程序之間就可以通過這一個管道進行通訊了 p1 = Process(target=f1,args=(conn2,)) p1.start() conn1.send('小寶貝,你在哪') print('我是主程序')
二、關Event() (簡稱事件)
e = Event() #Event的狀態預設為False
e.set() #將e的狀態改為True
e.clear() #將e的狀態還原會False
e.wait() #e的狀態是False,程式就會阻塞在這裡等待.
注意:Event一般只應用於2個程序間的通訊,程序多了,無法成立改機制.
from multiprocessing import Process,Event e = Event() #建立事件物件,這個物件的初識狀態為False print('e的狀態是:',e.is_set()) print('程序執行到這裡了') e.set() #將e的狀態改為True print('e的狀態是:',e.is_set()) e.clear() #將e的狀態改為False e.wait() #e這個事件物件如果值為False,就在我加wait的地方等待 print('程序過了wait')
三、關於multiprocessing中的Semaphore()方法(簡稱訊號量)
Semaphore是一個內建的計數器,也是一把鎖,可以設定同一時間段內,開鎖的程序數,(出去一個進去一個),預設是1個程序
跟Lock()函式相比,就是可以設定人數限制,同時執行某段程式碼
s = semphore(4)
s.acquire() #加鎖
需要鎖住的程式碼
s.release() #解鎖
from multiprocessing import Process,Semaphore import time import random def f1(i,s): s.acquire() print("%s男士到了"%i) time.sleep(random.randint(1,3)) s.release() if __name__ == '__main__': s = Semaphore(2) #鎖,引數是同時可以開鎖的程序數,此方法沒有with 簡便方法 for i in range(4): P = Process(target=f1,args=(i,s)) P.start()
四、關於程序池
from multiprocssing import Pool
Pool.map(obj,inter) #快速生成n項(可迭代項)任務,交給程序池,屬於非同步提交方式.引數1為開啟程序的物件,引數2必須是可迭代的,把後面的可迭代物件給前面的程序程式碼執行,快速生成 n項(可迭代項)任務
Pool方法就是在一直開啟著程序,每次執行任務時,就把任務放進程序池,(自帶join方法:後面的任務必須等著前面的任務執行完才能進入).該池子的預設值數量為電腦cpu的個數;
這個跟多程序比較,就是節省建立和銷燬程序的時間.
#用多程序執行任務,慢在了,建立程序,和執行完任務後的銷燬程序.而程序池,在池子裡一直開著程序,執行完任務,但是不關程序通道.
使用程序池運算和建立程序運算,時間的對比
import time from multiprocessing import Process,Pool def f1(n): for i in range(5): n = n+i if __name__ == '__main__': #使用程序池的執行程式碼所需時間 s_time = time.time() pool=Pool(4) #有100個任務,但是隻有4個程序能執行. pool.map(f1,range(100)) #引數1為開啟程序的物件,引數2必須是可迭代的,此map跟普通的map一樣 e_time = time.time() dif_time = e_time - s_time #使用多程序執行程式碼所需的時間 p_s_t = time.time() #起始時間 p_list = [] for i in range(100): p = Process(target=f1,args=(i,)) p.start() p_list.append(p) [pp.join() for pp in p_list] p_e_t = time.time() p_dif_t = p_e_t - p_s_t print("程序池的時間:",dif_time) print("多程序的執行時間:",p_dif_t) #列印結果為:每次結果都不一致,但是程序池用比多程序用的時間短,是一定的. #程序池的時間: 0.1326451301574707 #多程序的執行時間: 2.274912118911743
程序池的同步提交方式
pool = Pool()
res = pool.apply(obj,args=(i,)) #同步執行任務,必須等任務執行結束才能給程序池提交下一個任務,可以直接拿到返回結果res
#程序池的同步非同步方法 import time from multiprocessing import Pool def f1(n): time.sleep(0.5) return n*n if __name__ == '__main__': pool = Pool(4) for i in range(10): res = pool.apply(f1,args=(i,)) #同步方法,就是把它該成序列,可以接收返回值 print(res)
程序池的非同步提交方式
pool.apply_async(obj,args(i,)) #非同步提交
因為程序池比較特殊,主程式不會等待著程序池執行完畢才結束,但是主程式結束,程序池也就結束了,所以需要主程序等待程序池,
但是你無法判斷是否還有其他任務提交給程序池,所以要凍結住程序池(poool.close()),然後再讓主程式等待程序池(pool.join())
import time from multiprocessing import Process,Pool def f1(n): time.sleep(0.5) return n*n if __name__ == '__main__': pool = Pool(4) res_list = [] for i in range(10): res = pool.apply_async(f1,args=(i,)) print(res) #任務瞬間提交出去,但是程式還沒有計算出結果,這裡列印的就是一堆結果物件(一堆記憶體地址) res_list.append(res) for i in res_list: print(i.get()) print("主程序結束") #解釋:為什麼打印出來的結果是有序的? # 因為放進列表裡面的時候,是有順序的,你get()的時候,用的列表裡面的資料 # 為什麼結果是四個四個的出來? # 因為程序池的大小為4
需求:瞬間獲取到所有程序執行完後的資料
import time from multiprocessing import Pool def f1(n): time.sleep(0.5) return n*n if __name__ == '__main__': pool = Pool(4) res_list = [] #非同步提交的方法 for i in range(10): res = pool.apply_async(f1,args=(i,)) print(res) #任務瞬間提交出去,但是程式還沒有計算出結果,這裡列印的就只是一堆還沒有結果的,結果物件(一堆記憶體地址) res_list.append(res) #鎖住程序池,不再讓其他程式往裡面扔新的任務,確保沒有新的任務提交. pool.close() pool.join() for r in res_list: print(r.get()) time.sleep(2) #主程序執行結束,程序池裡面的任務全部停止,不會等待程序池裡面的任務 print("主程序結束")
五、回撥函式
什麼是回撥函式:一個子程序執行完得到的返回值,扔給另一個程序去處理,這個過程叫回調,接收返回值進行處理的函式,叫做回撥函式.
注意:回撥函式是在主程序裡面執行的.
import os from multiprocessing import Pool,Process def f1(n): print("引數值為",n) return n*n def call_back_full(s): print("call程序id",os.getpid()) print("回撥函式的結果:",s) if __name__ == '__main__': pool = Pool(4) res = pool.apply_async(f1,args=(5,),callback=call_back_full) #callback 譯為,回撥 pool.close() pool.join() print("主程序id",os.getpid())