1. 程式人生 > >Pipe(管道),Event(事件),Semaphore(訊號量),Pool(程序池),回撥函式

Pipe(管道),Event(事件),Semaphore(訊號量),Pool(程序池),回撥函式

一、關於Pipe(管道,佇列就是基於管道),不常用,因為管道中的內容是共享的,資料不安全,而且一個數據取走後,其他人沒法接收.

from multiprocessing import Process,Pipe
def f1(conn):
    from_zhujincheng = conn.recv()
    print('我是子程序')
    print('來自主程序的訊息:',from_zhujincheng)
if __name__ == '__main__':
    conn1,conn2 = Pipe()  #建立一個管道物件,全雙工,返回管道的兩端,但是一端傳送的訊息,只能另外一端接收,自己這一端是不能接收的
#可以將一端或者兩端傳送給其他的程序,那麼多個程序之間就可以通過這一個管道進行通訊了 p1 = Process(target=f1,args=(conn2,)) p1.start() conn1.send('小寶貝,你在哪') print('我是主程序')

 

二、關Event()  (簡稱事件)

e = Event()  #Event的狀態預設為False

e.set() #將e的狀態改為True

e.clear()  #將e的狀態還原會False

e.wait()  #e的狀態是False,程式就會阻塞在這裡等待.

注意:Event一般只應用於2個程序間的通訊,程序多了,無法成立改機制.

from multiprocessing import Process,Event
e = Event()  #建立事件物件,這個物件的初識狀態為False
print('e的狀態是:',e.is_set())
print('程序執行到這裡了')
e.set()  #將e的狀態改為True
print('e的狀態是:',e.is_set())
e.clear()  #將e的狀態改為False
e.wait()  #e這個事件物件如果值為False,就在我加wait的地方等待
print('程序過了wait')

 

三、關於multiprocessing中的Semaphore()方法(簡稱訊號量)

Semaphore是一個內建的計數器,也是一把鎖,可以設定同一時間段內,開鎖的程序數,(出去一個進去一個),預設是1個程序

  跟Lock()函式相比,就是可以設定人數限制,同時執行某段程式碼

s = semphore(4)

s.acquire() #加鎖

  需要鎖住的程式碼

s.release() #解鎖

from multiprocessing import Process,Semaphore
import time
import random
def f1(i,s):
    s.acquire()
    print("%s男士到了"%i)
    time.sleep(random.randint(1,3))
    s.release()
if __name__ == '__main__':
    s = Semaphore(2)  #鎖,引數是同時可以開鎖的程序數,此方法沒有with 簡便方法
    for i in range(4):
        P = Process(target=f1,args=(i,s))
        P.start()

四、關於程序池

from multiprocssing import Pool

Pool.map(obj,inter) #快速生成n項(可迭代項)任務,交給程序池,屬於非同步提交方式.引數1為開啟程序的物件,引數2必須是可迭代的,把後面的可迭代物件給前面的程序程式碼執行,快速生成 n項(可迭代項)任務
Pool方法就是在一直開啟著程序,每次執行任務時,就把任務放進程序池,(自帶join方法:後面的任務必須等著前面的任務執行完才能進入).該池子的預設值數量為電腦cpu的個數;
這個跟多程序比較,就是節省建立和銷燬程序的時間.
    #用多程序執行任務,慢在了,建立程序,和執行完任務後的銷燬程序.而程序池,在池子裡一直開著程序,執行完任務,但是不關程序通道.
使用程序池運算和建立程序運算,時間的對比
import
time from multiprocessing import Process,Pool def f1(n): for i in range(5): n = n+i if __name__ == '__main__': #使用程序池的執行程式碼所需時間 s_time = time.time() pool=Pool(4) #有100個任務,但是隻有4個程序能執行. pool.map(f1,range(100)) #引數1為開啟程序的物件,引數2必須是可迭代的,此map跟普通的map一樣 e_time = time.time() dif_time = e_time - s_time #使用多程序執行程式碼所需的時間 p_s_t = time.time() #起始時間 p_list = [] for i in range(100): p = Process(target=f1,args=(i,)) p.start() p_list.append(p) [pp.join() for pp in p_list] p_e_t = time.time() p_dif_t = p_e_t - p_s_t print("程序池的時間:",dif_time) print("多程序的執行時間:",p_dif_t) #列印結果為:每次結果都不一致,但是程序池用比多程序用的時間短,是一定的. #程序池的時間: 0.1326451301574707 #多程序的執行時間: 2.274912118911743

 

程序池的同步提交方式

pool = Pool()

res = pool.apply(obj,args=(i,))  #同步執行任務,必須等任務執行結束才能給程序池提交下一個任務,可以直接拿到返回結果res

#程序池的同步非同步方法
import time
from multiprocessing import Pool
def f1(n):
    time.sleep(0.5)
    return n*n
if __name__ == '__main__':
    pool = Pool(4)
    for i in range(10):
        res = pool.apply(f1,args=(i,)) #同步方法,就是把它該成序列,可以接收返回值
        print(res)

 

程序池的非同步提交方式

pool.apply_async(obj,args(i,)) #非同步提交

因為程序池比較特殊,主程式不會等待著程序池執行完畢才結束,但是主程式結束,程序池也就結束了,所以需要主程序等待程序池,

但是你無法判斷是否還有其他任務提交給程序池,所以要凍結住程序池(poool.close()),然後再讓主程式等待程序池(pool.join())

import time
from multiprocessing import Process,Pool
def f1(n):
    time.sleep(0.5)
    return n*n
if __name__ == '__main__':
    pool = Pool(4)
    res_list = []
    for i in range(10):
        res = pool.apply_async(f1,args=(i,))
        print(res) #任務瞬間提交出去,但是程式還沒有計算出結果,這裡列印的就是一堆結果物件(一堆記憶體地址)
        res_list.append(res)
    for i in res_list:
        print(i.get())
    print("主程序結束")
#解釋:為什麼打印出來的結果是有序的?
    # 因為放進列表裡面的時候,是有順序的,你get()的時候,用的列表裡面的資料
    #  為什麼結果是四個四個的出來?
    #  因為程序池的大小為4
需求:瞬間獲取到所有程序執行完後的資料
import
time from multiprocessing import Pool def f1(n): time.sleep(0.5) return n*n if __name__ == '__main__': pool = Pool(4) res_list = [] #非同步提交的方法 for i in range(10): res = pool.apply_async(f1,args=(i,)) print(res) #任務瞬間提交出去,但是程式還沒有計算出結果,這裡列印的就只是一堆還沒有結果的,結果物件(一堆記憶體地址) res_list.append(res) #鎖住程序池,不再讓其他程式往裡面扔新的任務,確保沒有新的任務提交. pool.close() pool.join() for r in res_list: print(r.get()) time.sleep(2) #主程序執行結束,程序池裡面的任務全部停止,不會等待程序池裡面的任務 print("主程序結束")

五、回撥函式

什麼是回撥函式:一個子程序執行完得到的返回值,扔給另一個程序去處理,這個過程叫回調,接收返回值進行處理的函式,叫做回撥函式.

注意:回撥函式是在主程序裡面執行的.

import os
from multiprocessing import Pool,Process
def f1(n):
    print("引數值為",n)
    return n*n
def call_back_full(s):
    print("call程序id",os.getpid())
    print("回撥函式的結果:",s)
if __name__ == '__main__':
    pool = Pool(4)
    res = pool.apply_async(f1,args=(5,),callback=call_back_full)  #callback 譯為,回撥
    pool.close()
    pool.join()
    print("主程序id",os.getpid())