1. 程式人生 > >第三十二天- 管道 程序池

第三十二天- 管道 程序池

 

1.管道

  程序間通訊(IPC)方式二:管道(不推薦使用,瞭解即可),埠易導致資料不安全的情況出現。

 1 from multiprocessing import Pipe,Process
 2 
 3 
 4 def func(conn1,conn2):
 5     msg = conn1.recv()  # 接收了conn2傳遞的
 6     # msg1 = conn2.recv()  # 接收了conn1傳遞的
 7     print('>>>',msg)
 8     # print('>>>',msg1)
 9 
10
11 if __name__ == '__main__': 12 # 拿到管道的兩端,雙工通訊方式,兩端都可以收發訊息 13 conn1,conn2 = Pipe() # 必須在Process之前產生管道 14 p = Process(target=func,args=(conn1,conn2,)) # 管道給子程序 15 p.start() 16 conn1.send('hello') 17 conn1.close() 18 conn2.send('小子') 19 conn2.close() 20 21 print
('程序結束') 22 23 # 注意管道不用了就關閉防止異常

 

 

2.共享資料

  程序之間資料共享的模組之一Manager模組(少用):

  程序間資料是獨立的,可以藉助於佇列或管道實現通訊,二者都是基於訊息傳遞的雖然程序間資料獨立,但可以通過Manager實現資料共享:

 1 from multiprocessing import Manager,Process,Lock
 2 
 3 
 4 def func1(dic,loc):
 5     # loc.acquire()  # 不加鎖易出錯
 6     dic['num
'] -= 1 7 # loc.release() 8 9 10 if __name__ == '__main__': 11 m = Manager() 12 loc = Lock() 13 dic = m.dict({'num':100}) 14 p_list = [] 15 for i in range(100): 16 p = Process(target=func1, args=(dic,loc)) 17 p_list.append(p) 18 p.start() 19 20 [pp.join() for pp in p_list] 21 22 print('>>>>>',dic['num']) 23 # 共享時不加鎖,很可能導致同一個資料被多個子程序取用,資料是不安全的,且超多程序消耗大量資源易導致卡死.
基於Manager的資料共享

  多程序共同去處理共享資料的時候,就和我們多程序同時去操作一個檔案中的資料是一樣的,不加鎖就會出現錯誤的結果,程序不安全的,所以也需要加鎖

 

總結:程序間應該儘量避免通訊,即便需要通訊,也應該選擇程序安全的工具來避免加鎖帶來的問題。

 

 

3.程序池 Pool

  建立程序需要消耗時間,銷燬程序(空間,變數,檔案資訊等等的內容)也需要消耗時間。開啟成千上萬的程序,作業系統無法讓他們同時執行,維護一個很大的程序列表的同時,排程的時候,還需要進行切換並且記錄每個程序的執行節點,也就是記錄上下文(各種變數等等亂七八糟的東西,雖然你看不到,但是作業系統都要做),這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序,這就需要用到程序池:

  定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實現併發效果。

 

建立方法:

 Pool([numprocess [,initializer [, initargs]]]):建立程序池

引數介紹:

1 numprocess:要建立的程序數,如果省略,將預設使用cpu_count()的值
2 initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為None
3 initargs:是要傳給initializer的引數組

常用方法:

p.apply(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
'''需要強調的是:此操作並不會在所有池工作程序中並執行func函式。如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p.apply()函式或者使用p.apply_async()'''

p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
'''此方法的結果是AsyncResult類的例項,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。'''
    
p.close():關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成

P.jion():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫

主要方法介紹

 

 1 import time
 2 from multiprocessing import Process,Pool
 3 
 4 
 5 def func1(i):
 6     num = 0
 7     for j in range(5):
 8         num += i
 9 
10 
11 if __name__ == '__main__':
12     pool = Pool(6)  # 建立程序池
13 
14     p_list = []
15     start_time = time.time()
16     for i in range(500):
17         p = Process(target=func1,args=(i,))
18         p_list.append(p)
19         p.start()
20 
21     [pp.join() for pp in p_list]
22     end_time = time.time()
23     print('耗時:',end_time-start_time)
24 
25     s_time = time.time()
26     pool.map(func1,range(500))  # map
27     e_time = time.time()
28     print('耗時:',e_time - s_time)  # 耗時遠遠小於直接開500程序
程序池 簡單應用

 

apply同步方法:

 1 from multiprocessing import Process,Pool
 2 import time
 3 
 4 
 5 def func1(i):
 6     num = 0
 7     for j in range(3):
 8         num += i
 9     time.sleep(1)
10     print(num)
11     return num
12 
13 
14 if __name__ == '__main__':
15     pool = Pool(6)
16 
17     for i in range(10):
18         res = pool.apply(func1,args=(i,))  # apply 程序同步/序列方法 效率低,不常用
19         # print(res)
apply 程序同步/序列方法

 

apply_async非同步方法:

 1 from multiprocessing import Process,Pool
 2 import time
 3 
 4 
 5 def func1(i):
 6     num = 0
 7     for j in range(5):
 8         num += i
 9     time.sleep(1)
10     # print('>>>>>',num)
11     return num
12 
13 
14 if __name__ == '__main__':
15     pool = Pool(6)
16 
17     red_list = []
18     for i in range(10):
19         res = pool.apply_async(func1,args=(i,))
20         red_list.append(res)
21 
22     pool.close()  # 不是關閉,只是鎖定程序池,告訴主程序不會再新增資料進去
23     pool.join()  # 等待子程式執行完
24 
25     for ress in red_list:
26         print(ress.get())  # get方法取出返回值num 按新增順序取出已儲存在快取區的結果 所以是順序打印出的
View Code

 

回撥函式:運用時注意一點,回撥函式的形參執行有一個,如果你的執行函式有多個返回值,那麼也可以被回撥函式的這一個形參接收,接收的是一個元祖,包含著你執行函式的所有返回值。

 1 from multiprocessing import Pool,Process
 2 import time,os
 3 
 4 
 5 def func1(n):
 6     # print('子程序的pid:',os.getpid())
 7     return n*n
 8 
 9 
10 def func2(i):
11     res = i**2
12     # print('callback的pid:',os.getpid())
13     print(res)
14     return res
15 
16 
17 if __name__ == '__main__':
18     pool = Pool(4)
19     pool.apply_async(func1,args=(3,),callback=func2)  # callback把前面的返回值作引數傳給後面
20     # print('主程序的pid:',os.getpid())  # 主程序執行了callback
21     pool.close()
22     pool.join()
回撥函式 callback

 

 

4.總結

  程序之間的通訊:佇列、管道、資料共享也算

  訊號量和事件也相當於鎖,也是全域性的,所有程序都能拿到這些鎖的狀態,程序之間這些鎖啊訊號量啊事件啊等等的通訊,其實底層還是socekt,只不過是基於檔案的socket通訊,而不是跟上面的資料共享啊空間共享啊之類的機制,我們之前學的是基於網路的socket通訊,socket的兩個家族,一個檔案的一個網路的,所以如果說這些鎖之類的報錯,可能你看到的就是類似於socket的錯誤。工作中常用的是鎖,訊號量和事件不常用,但是訊號量和事件面試的時候會問到(做了解)