1. 程式人生 > >Python並發復習3 - 多進程模塊 multiprocessing

Python並發復習3 - 多進程模塊 multiprocessing

標準庫 阻塞 實例 ron lee cas group 父進程 提示

Python標準庫為我們提供了threading(多線程模塊)和multiprocessing(多進程模塊)。從Python3.2開始,標準庫為我們提供了concurrent.futures模塊,它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 兩個類,實現了對threading和multiprocessing的更高級的抽象,對編寫線程池/進程池提供了直接的支持。

核心原理是:concurrent.futures會以子進程的形式,平行的運行多個python解釋器,從而令python程序可以利用多核CPU來提升執行速度。由於子進程與主解釋器相分離,所以他們的全局解釋器鎖也是相互獨立的。每個子進程都能夠完整的使用一個CPU內核,可以利用multiprocessing實現真正的平行計算。


一 、進程的調用

1.1 函數式調用

 1 from multiprocessing import Process
 2 import time
 3 def f(name):
 4     time.sleep(1)
 5     print(hello, name,time.ctime())
 6 
 7 if __name__ == __main__:
 8     p_list=[]
 9     for i in range(3):
10         p = Process(target=f, args=(alvin,))
11         p_list.append(p)
12 p.start() 13 for i in p_list: 14 p.join() 15 print(end)

1.2 類調用

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        #self.name = name

    def run(self):
        time.sleep(
1) print (hello, self.name,time.ctime()) if __name__ == __main__: p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print(end)

二 、Process類

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 線程組,目前還沒有實現,庫引用中提示必須是None;
  target: 要執行的方法;
  name: 進程名;
  args/kwargs: 要傳入方法的參數。

實例方法:

  is_alive():返回進程是否在運行。

  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。

  start():進程準備就緒,等待CPU調度

  run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。

  terminate():不管任務是否完成,立即停止工作進程

屬性:

  daemon:和線程的setDeamon功能一樣

  name:進程名字。

  pid:進程號。

三 、進程間通訊 

1、進程對列Queue

---------- 一個流水線,各個工人共享主線程流水線產品隊列數據

2、 管道pipe

 1 from multiprocessing import Process, Pipe
 2 
 3 def func(contact):
 4     contact.send("這是管道測試信息")
 5     contact.close()
 6     
 7 if __name__ == __main__:
 8     a_con, b_con = Pipe()
 9     p = Process(target=func, args=(a_con,))
10     print(b_con.recv())
11     b_con.send("管道返回信息")

3、manage

--- Manager是一種較為高級的多進程通信方式,它能支持Python支持的的任何數據結構,適用於多個進程不是源於同一個父進程的情形。

原理是:先啟動一個ManagerServer進程,這個進程是阻塞的,它監聽一個socket,然後其他進程(ManagerClient)通過socket來連接到ManagerServer,實現通信。

 1 from multiprocessing import Process, Manager
 2 from time import sleep
 3 
 4 
 5 def thread_a_main(sync_data_pool):  # A 進程主函數,存入100+的數
 6     for ix in range(100, 105):
 7         sleep(1)
 8         sync_data_pool.append(ix)
 9 
10 
11 def thread_b_main(sync_data_pool):  # B 進程主函數,存入300+的數
12     for ix in range(300, 309):
13         sleep(0.6)
14         sync_data_pool.append(ix)
15 
16 
17 def _test_case_000():  # 測試用例
18     manager = Manager()  # multiprocessing 中的 Manager 是一個工廠方法,直接獲取一個 SyncManager 的實例
19     sync_data_pool = manager.list()  # 利用 SyncManager 的實例來創建同步數據池
20     Process(target=thread_a_main, args=(sync_data_pool, )).start()  # 創建並啟動 A 進程
21     Process(target=thread_b_main, args=(sync_data_pool, )).start()  # 創建並啟動 B 進程
22     for ix in range(6):  # C 進程(主進程)中實時的去查看數據池中的數據
23         sleep(1)
24         print(sync_data_pool)
25 
26 
27 if __main__ == __name__: 
28     _test_case_000()

四 、進程同步

 1 from multiprocessing import Process, Lock
 2 
 3 def f(l, i):
 4   
 5     with l.acquire():
 6         print(hello world %s%i)
 7 
 8 if __name__ == __main__:
 9     lock = Lock()
10 
11     for num in range(10):
12         Process(target=f, args=(lock, num)).start()

Python並發復習3 - 多進程模塊 multiprocessing