(三十二)管道,事件,信號量,進程池
一、管道---Pipe
這是一個單向流動的管道,一次產生一對。看代碼:
from multiprocessing import Pipe,Process def f(c): print(c.recv())#接收不需要表明個數 if __name__ == ‘__main__‘: conn1, conn2 = Pipe()#一次產生兩個通道 p = Process(target=f, args=(conn2,)) p.start() conn1.send(‘哈哈哈哈哈akfnaslknf‘)#發送不需要使用字節流 看看源碼: def Pipe(duplex=True):#duplex:有兩部分的 return Connection(), Connection()#這裏返回了兩個
二、事件--Event
def f(e): print(‘hello,Python!!‘) e.set()#把狀態修改為True,如果不修改,程序會停不下來 if __name__ == ‘__main__‘: ev = Event()#初始狀態是False print(ev.is_set())#查看事件當前的狀態 p = Process(target=f, args=(ev,)) p.start() # ev.clear()#設置事件狀態為False
三、信號量---Semaphore
先看代碼吧:
import random
From fultiprocessing import Process,Semaphore
def f(s):
s.acquire()#這裏會-1
# time.sleep(2)#(1)
time.sleep(random.randint(1, 5))#(2)
print(‘hello‘)
s.release()#這裏會+1
if __name__ == ‘__main__‘:
s = Semaphore(4)#設置一個數量
for i in range(10):
p = Process(target=f, args=(s,))
p.start()
說明一下:信號量就像是創建了4個通道,進去一個進程會把們鎖上,別人來了沒有鑰匙進不去,只有等某個進程完成了,其他的進程才可以進來。
所以,使用(1)處的方式打印出來時,就是四個一組的被打印出來,而使用(2)處的方式打印時,就有點隨機了。這是因為,每次隨機出來的時間不一樣,所以等待的時間也不一樣,某個進程完成後其他的就可以搶奪這個通道。
四、進程池---Pool
- map()
def f(n): time.sleep(1) print(n) if __name__ == ‘__main__‘: po = Pool(4) po.map(f, range(10))#使用方式類似前面的方式,第一個參數是函數名,第二個是可叠代的參數
使用線程池,做操作比直接使用進程,要節省時間,看下代碼:
# def fn(n): # for i in range(5): # n += i # # if __name__ == ‘__main__‘: # st = time.time() # po = Pool(os.cpu_count()) # po.map(fn, range(100)) # et = time.time() # # # pst = time.time() # lst = [] # for i in range(100): # p = Process(target=fn, args=(i,)) # p.start() # lst.append(p) # [pro.join() for pro in lst] # pet = time.time() # print(et - st) # print(pet-pst)
2.進程池---apply()
def f(n): # print(n*n) return n*n if __name__ == ‘__main__‘: po = Pool(4) for i in range(10): res = po.apply(f, args=(i,))#還可以接收返回值 print(res)
這個執行順序是同步的,那麽和下面的代碼有什麽區別嗎?
def f(n): pass for i in range(10): p = Process(target=f,args=(i,)) p.start()
上面兩段代碼都是串行,但是還是有很大區別的:(1)使用進程池,其實只是創建了4個進程,而且這四個進程是一直被復用的,直到任務完成才銷毀,而下面這種卻創建了10個進程。(2)
3.進程池---apply_async()
看代碼:
def f(n): # print(n) time.sleep(1) return n**2 if __name__ == ‘__main__‘: po = Pool(4) res_lst = []#收集結果集 for i in range(10): res = po.apply_async(f, args=(i,))#也可以接收返回值,但是接收到的是結果對象,不是直接的值。 #res.get()#這個get()與隊列Queue的get()類似,不拿到值,誓不罷休,所以就會等著進程處理完返回了值後再繼續往下走, # 但是這樣循環就沒有意義了。所以把這個結果集收集起來,然後在統一處理。 res_lst.append(res) #集中處理結果集 for r in res_lst: print(r.get()) #使用close()為的是鎖住進程池,不讓別的程序繼續往這個進程池丟任務,這樣做完自己的任務後就可以結束了。 po.close()#鎖住進程池 po.join()#等待所有任務被執行完,不然主進程結束,子進程也跟著結束了
4.進程池---apply_async(),回調函數
看代碼:
def f(i): return i**2 def callBackFunc(n): print(n) if __name__ == ‘__main__‘: po = Pool(4) for i in range(10): #callback=callBackFunc這裏就是一個函數名 po.apply_async(f, args=(i,), callback=callBackFunc) po.close() po.join()
(三十二)管道,事件,信號量,進程池