1. 程式人生 > >(三十二)管道,事件,信號量,進程池

(三十二)管道,事件,信號量,進程池

pen 字節 semaphore and als lex 當前 復用 main

一、管道---Pipe

這是一個單向流動的管道,一次產生一對。看代碼:

from multiprocessing import Pipe,Process
def f(c):
    print(c.recv())#接收不需要表明個數
if __name__ == ‘__main__‘:
    conn1, conn2 = Pipe()#一次產生兩個通道
    p = Process(target=f, args=(conn2,))
    p.start()
    conn1.send(‘哈哈哈哈哈akfnaslknf‘)#發送不需要使用字節流

看看源碼:
def Pipe(duplex=True):#duplex:有兩部分的
    return Connection(), Connection()#這裏返回了兩個

  

二、事件--Event

def f(e):
    print(‘hello,Python!!‘)
    e.set()#把狀態修改為True,如果不修改,程序會停不下來
if __name__ == ‘__main__‘:
    ev = Event()#初始狀態是False
    print(ev.is_set())#查看事件當前的狀態
    p = Process(target=f, args=(ev,))
    p.start()
    # ev.clear()#設置事件狀態為False

  

三、信號量---Semaphore

先看代碼吧:

import random

From fultiprocessing import Process,Semaphore
def f(s):
s.acquire()#這裏會-1
# time.sleep(2)#(1)
time.sleep(random.randint(1, 5))#(2)
print(‘hello‘)
s.release()#這裏會+1

if __name__ == ‘__main__‘:
s = Semaphore(4)#設置一個數量
for i in range(10):
p = Process(target=f, args=(s,))
p.start()

說明一下:信號量就像是創建了4個通道,進去一個進程會把們鎖上,別人來了沒有鑰匙進不去,只有等某個進程完成了,其他的進程才可以進來。

所以,使用(1)處的方式打印出來時,就是四個一組的被打印出來,而使用(2)處的方式打印時,就有點隨機了。這是因為,每次隨機出來的時間不一樣,所以等待的時間也不一樣,某個進程完成後其他的就可以搶奪這個通道。

四、進程池---Pool

  1. map()

 def f(n):
    time.sleep(1)
    print(n)
if __name__ == ‘__main__‘:
    po = Pool(4)
    po.map(f, range(10))#使用方式類似前面的方式,第一個參數是函數名,第二個是可叠代的參數

  

使用線程池,做操作比直接使用進程,要節省時間,看下代碼:

# def fn(n):
#     for i in range(5):
#         n += i
#
# if __name__ == ‘__main__‘:
#     st = time.time()
#     po = Pool(os.cpu_count())
#     po.map(fn, range(100))
#     et = time.time()
#
#
#     pst = time.time()
#     lst = []
#     for i in range(100):
#         p = Process(target=fn, args=(i,))
#         p.start()
#         lst.append(p)
#     [pro.join() for pro in lst]
#     pet = time.time()
#     print(et - st)
#     print(pet-pst)

  2.進程池---apply()

def f(n):
    # print(n*n)
    return n*n
if __name__ == ‘__main__‘:
    po = Pool(4)
    for i in range(10):
        res = po.apply(f, args=(i,))#還可以接收返回值
        print(res)

  

這個執行順序是同步的,那麽和下面的代碼有什麽區別嗎?

def f(n):
    pass
for i in range(10):
    p = Process(target=f,args=(i,))
    p.start()

  

上面兩段代碼都是串行,但是還是有很大區別的:(1)使用進程池,其實只是創建了4個進程,而且這四個進程是一直被復用的,直到任務完成才銷毀,而下面這種卻創建了10個進程。(2

3.進程池---apply_async()

看代碼:

def f(n):
    # print(n)
    time.sleep(1)
    return n**2
if __name__ == ‘__main__‘:
    po = Pool(4)
    res_lst = []#收集結果集
    for i in range(10):
        res = po.apply_async(f, args=(i,))#也可以接收返回值,但是接收到的是結果對象,不是直接的值。
        #res.get()#這個get()與隊列Queue的get()類似,不拿到值,誓不罷休,所以就會等著進程處理完返回了值後再繼續往下走,
        # 但是這樣循環就沒有意義了。所以把這個結果集收集起來,然後在統一處理。
        res_lst.append(res)
    #集中處理結果集
    for r in res_lst:
        print(r.get())
    #使用close()為的是鎖住進程池,不讓別的程序繼續往這個進程池丟任務,這樣做完自己的任務後就可以結束了。
    po.close()#鎖住進程池
    po.join()#等待所有任務被執行完,不然主進程結束,子進程也跟著結束了

 4.進程池---apply_async(),回調函數

看代碼:

def f(i):
    return i**2
def callBackFunc(n):
    print(n)
if __name__ == ‘__main__‘:
    po = Pool(4)
    for i in range(10):
	#callback=callBackFunc這裏就是一個函數名
        po.apply_async(f, args=(i,), callback=callBackFunc)
    po.close()
    po.join()

  

(三十二)管道,事件,信號量,進程池