Python並發復習3 - 多進程模塊 multiprocessing
Python標準庫為我們提供了threading(多線程模塊)和multiprocessing(多進程模塊)。從Python3.2開始,標準庫為我們提供了concurrent.futures模塊,它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 兩個類,實現了對threading和multiprocessing的更高級的抽象,對編寫線程池/進程池提供了直接的支持。
核心原理是:concurrent.futures會以子進程的形式,平行的運行多個python解釋器,從而令python程序可以利用多核CPU來提升執行速度。由於子進程與主解釋器相分離,所以他們的全局解釋器鎖也是相互獨立的。每個子進程都能夠完整的使用一個CPU內核,可以利用multiprocessing實現真正的平行計算。
一 、進程的調用
1.1 函數式調用
1 from multiprocessing import Process 2 import time 3 def f(name): 4 time.sleep(1) 5 print(‘hello‘, name,time.ctime()) 6 7 if __name__ == ‘__main__‘: 8 p_list=[] 9 for i in range(3): 10 p = Process(target=f, args=(‘alvin‘,)) 11 p_list.append(p)12 p.start() 13 for i in p_list: 14 p.join() 15 print(‘end‘)
1.2 類調用
from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() #self.name = name def run(self): time.sleep(1) print (‘hello‘, self.name,time.ctime()) if __name__ == ‘__main__‘: p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print(‘end‘)
二 、Process類
構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 線程組,目前還沒有實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 進程名;
args/kwargs: 要傳入方法的參數。
實例方法:
is_alive():返回進程是否在運行。
join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。
start():進程準備就緒,等待CPU調度
run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。
terminate():不管任務是否完成,立即停止工作進程
屬性:
daemon:和線程的setDeamon功能一樣
name:進程名字。
pid:進程號。
三 、進程間通訊
1、進程對列Queue
---------- 一個流水線,各個工人共享主線程流水線產品隊列數據
2、 管道pipe
1 from multiprocessing import Process, Pipe 2 3 def func(contact): 4 contact.send("這是管道測試信息") 5 contact.close() 6 7 if __name__ == ‘__main__‘: 8 a_con, b_con = Pipe() 9 p = Process(target=func, args=(a_con,)) 10 print(b_con.recv()) 11 b_con.send("管道返回信息")
3、manage
--- Manager是一種較為高級的多進程通信方式,它能支持Python支持的的任何數據結構,適用於多個進程不是源於同一個父進程的情形。
原理是:先啟動一個ManagerServer進程,這個進程是阻塞的,它監聽一個socket,然後其他進程(ManagerClient)通過socket來連接到ManagerServer,實現通信。
1 from multiprocessing import Process, Manager 2 from time import sleep 3 4 5 def thread_a_main(sync_data_pool): # A 進程主函數,存入100+的數 6 for ix in range(100, 105): 7 sleep(1) 8 sync_data_pool.append(ix) 9 10 11 def thread_b_main(sync_data_pool): # B 進程主函數,存入300+的數 12 for ix in range(300, 309): 13 sleep(0.6) 14 sync_data_pool.append(ix) 15 16 17 def _test_case_000(): # 測試用例 18 manager = Manager() # multiprocessing 中的 Manager 是一個工廠方法,直接獲取一個 SyncManager 的實例 19 sync_data_pool = manager.list() # 利用 SyncManager 的實例來創建同步數據池 20 Process(target=thread_a_main, args=(sync_data_pool, )).start() # 創建並啟動 A 進程 21 Process(target=thread_b_main, args=(sync_data_pool, )).start() # 創建並啟動 B 進程 22 for ix in range(6): # C 進程(主進程)中實時的去查看數據池中的數據 23 sleep(1) 24 print(sync_data_pool) 25 26 27 if ‘__main__‘ == __name__: 28 _test_case_000()
四 、進程同步
1 from multiprocessing import Process, Lock 2 3 def f(l, i): 4 5 with l.acquire(): 6 print(‘hello world %s‘%i) 7 8 if __name__ == ‘__main__‘: 9 lock = Lock() 10 11 for num in range(10): 12 Process(target=f, args=(lock, num)).start()
Python並發復習3 - 多進程模塊 multiprocessing