1. 程式人生 > >Python 併發程式設計(管道,事件,訊號量,程序池)

Python 併發程式設計(管道,事件,訊號量,程序池)

管道

Conn1,conn2 = Pipe()

Conn1.recv()

Conn1.send()

資料接收一次就沒有了

from multiprocessing import Process,Pipe

def f1(conn):

    from_zhujincheng = conn.recv()
    print('子程序')
    print('來自主程序的訊息:',from_zhujincheng)

if __name__ == '__main__':
    conn1,conn2 = Pipe()  #建立一個管道物件,全雙工,返回管道的兩端,但是一端傳送的訊息,只能另外一端接收,自己這一端是不能接收的
p1 = Process(target=f1,args=(conn2,)) p1.start() conn1.send('出來吧') print('主程序')

事件

E = Event()  #初識狀態是false

E.wait()  當事件物件e的狀態為false的時候,在wait的地方會阻塞程式,當物件狀態為true的時候,直接在這個wait地方繼續往下執行

E.set()  將事件物件的狀態改為true,

E.is_set() 檢視狀態

E.clear()  將事件物件的狀態改為false

from
multiprocessing import Process,Event e = Event() #建立事件物件,這個物件的初識狀態為False print('e的狀態是:',e.is_set()) # False print('程序執行到這裡了') e.set() #將e的狀態改為True print('e的狀態是:',e.is_set()) # True e.clear() #將e的狀態改為False e.wait() #e這個事件物件如果值為False,就在我加wait的地方等待 print('程序過了wait')

訊號量

S = semphore(數字),內部維護了一個計數器,acquire-1,release+1,為0的時候,其他的程序都要在acquire之前等待

S.acquire()

需要鎖住的程式碼

S.release()

import time,random
from multiprocessing import Process,Semaphore

def f1(i,s):
    s.acquire()
    print('%s男嘉賓到了'%i)
    time.sleep(random.randint(1,3))
    s.release()

if __name__ == '__main__':
    s = Semaphore(4)  #計數器4,acquire一次減一,為0 ,其他人等待,release加1
    for i in range(10):
        p = Process(target=f1,args=(i,s))
        p.start()

程序池

程序的建立和銷燬是很有消耗的,影響程式碼執行效率

在有程序池的程式碼中,主程序執行結束,程序池裡面的任務全部停止,不會等待程序池裡面的任務

pl = Pool(數字)   這個數字一般是電腦的cpu數

pl的方法:

  Map:非同步提交任務,並且傳參需要可迭代型別的資料,自帶close和join功能

import time
from multiprocessing import Process,Pool

#對比多程序和程序池的效率
def f1(n):
    for i in range(5):
        n = n + i

if __name__ == '__main__':

    #統計程序池執行100個任務的時間
    s_time = time.time()
    pool = Pool(4)  
    pool.map(f1,range(100))  
    e_time = time.time()
    dif_time = e_time - s_time

    #統計100個程序,來執行100個任務的執行時間
    p_s_t = time.time() #多程序起始時間
    p_list = []
    for i in range(100):
        p = Process(target=f1,args=(i,))
        p.start()
        p_list.append(p)
    [pp.join() for pp in p_list]
    p_e_t = time.time()
    p_dif_t = p_e_t - p_s_t
    print('程序池的時間:',dif_time)
    print('多程序的執行時間:',p_dif_t)
    # 結果:  程序池的時間: 0.40102291107177734    多程序的執行時間: 9.247529029846191     
    # 可以看出程序池執行效率遠遠大於建立多程序

  

Close : 鎖住程序池,防止有其他的新的任務在提交給程序池

  Join : 等待著程序池將自己裡面的任務都執行完

  Res = Apply(f1,args=(i,))  #同步執行任務,必須等任務執行結束才能給程序池提交下一個任務,可以直接拿到返回結果res

import time
from multiprocessing import Process,Pool

def f1(n):
    time.sleep(1)
    return n*n

if __name__ == '__main__':

    pool = Pool(4)
    for i in range(10):
        res = pool.apply(f1,args=(i,))
        print(res)

Res_obj = Apply_async(f1,args=(i,))  #非同步提交任務,可以直接拿到結果物件,從結果物件裡面拿結果,要用get方法,get方法會阻塞程式,沒有拿到結果會一直等待

import time
from multiprocessing import Process,Pool

def f1(n):
    time.sleep(0.5)
    return n*n

if __name__ == '__main__':

    pool = Pool(4)

    res_list = []
    for i in range(10):
        res = pool.apply_async(f1,args=(i,)) # 不能直接列印返回值,因為直接返回結果物件,程序還沒執行完,結果物件裡沒有資料
        res_list.append(res)

    pool.close()  
    pool.join()

    #列印結果,非同步提交之後的結果物件
    for i in res_list:
        print(i.get())

回撥函式:

 Apply_async(f1,args=(i,),callback=function)  #將前面f1這個任務的返回結果作為引數傳給callback指定的那個function函式

import os
from multiprocessing import Pool,Process

def f1(n):
    print('傳入的函式',n)
    return n*n

def call_back_func(asdf):
    print('回撥函式',asdf)

if __name__ == '__main__':
    pool = Pool(4)
    res = pool.apply_async(f1,args=(5,),callback=call_back_func)
    pool.close()
    pool.join()