1. 程式人生 > >多工處理方式之一:多程序

多工處理方式之一:多程序

#### 程序的理解: - 1、系統進行**資源分配**和**排程**的基本單位,一個**具有一定獨立功能**的**程式**關於**某個資料集合**的一次**執行活動**; - 2、它是一個**動態的概念**,一個**活動的實體**; - 狹義定義:`an instance of a computer program that is being executed` 即**正在執行的程式的例項化物件**。 - 廣義定義:程序是一個具有一定獨立功能的程式關於某個資料集合的一次執行活動,是**作業系統進行資源分配和排程的基本單位**,是**作業系統動態執行的基本單元**。 - **注:其概念的關鍵點在於** ```bash 1)、程序是一個實體(動態的),具有自己獨立的地址空間,包括: 文字區域(text region):儲存處理器執行的程式碼; 資料區域(data region):儲存變數與程序執行期間使用的動態分配的記憶體; 堆疊(stack region):儲存的是程式執行過程中呼叫的指令與本地變數; 注:正是由於每個程序是一個獨立的實體,其中以上所述的三個區域,即每個程序的資料區域以及堆疊是獨立的,相互隔離的,所以在多程序中可以保證資料的安全性 ``` ```bash 2)、編寫完的程式碼,沒有執行時,稱為程式, 正在執行的程式碼,稱為程序 程式是死的(靜態的),程序是活的(動態的) ``` - 3、程序的**三大狀態** - (1) **就緒(Ready)狀態** 程序建立完成即其他所有資源都已分配完畢,等待cpu排程執行時,稱為就緒狀態。 - (2) **執行(Running)狀態** cpu開始執行該程序時稱為執行狀態。 - (3) **阻塞(Blocked)狀態** 由於等待某個事件發生而無法執行時,便是阻塞狀態,cpu執行其他程序.例如,等待I/O完成input、申請緩衝區不能滿足等等。 **如圖所示** ![](https://img2020.cnblogs.com/blog/2001508/202008/2001508-20200801022842057-1422798008.png) **** #### CPU 排程程序的方式 - 先來先服務**fcfs**(first come first server):**先來的先執行** - **短作業優先演算法**:分配的cpu多,先把短的算完 - **時間片輪轉演算法**:每一個任務就執行一個時間片的時間.然後就執行其他的 - **多級反饋佇列演算法** - 越是時間長的,cpu分配的資源越少,優先順序靠後 - 越是時間短的,cpu分配的資源越多 **** #### 建立程序 **匯入`multiprocessing模組中的Process類`以供後續建立類的時候直接呼叫** `p = Process(target = func, name = process01, args=(5,))` 例項化程序物件 ##### Process 類引數介紹 > - target = func 表示呼叫物件,即子程序要執行的任務 func > - args 表示任務 func 的位置引數元組,args=(5, ) > - name = process01 為子程序的名稱 ##### Process 類常⽤⽅法 > - p.start( ): 啟動程序,並呼叫該子程序中的p.run( ) > - p.run( ): 程序啟動時執行的方法,正是它去呼叫target指定的函式,我們自定義類的類中一定要寫入該方法 > - p.terminate( ): 強制終止程序p,不會進行任何清理操作 > - p.is_alive( ): 如果p仍然執行,返回True。用來判斷程序是否還在執行 > - p.join([timeout]): 主程序等待子程序p終止,timeout是可選的等待時間 ```python # 主程序速度快於子程序,join方法可以使得子程序執行結束後,再繼續執行主程序中的程式碼,可以用來同步程式碼的一致性 import multiprocessing def func(): print("傳送第一份郵件") if __name__ == "__main__": p = multiprocessing.Process(target=func) p.start() p.join() print("傳送第二份郵件") # 傳送第一份郵件 # 傳送第二份郵件 ``` ```python # 多個子程序配合 join 方法實現非同步併發 import multiprocessing def func(index): print(f"傳送第{index}封郵件") if __name__ == "__main__": process_list = [] for i in range(10): p = multiprocessing.Process(target=func, args=(i, )) p.start() process_list.append(p) # p.join() 程式會變成同步阻塞 for i in process_list: i.join() # 非同步併發 print("主程序發最後一封郵件!") ``` ##### Process類常⽤屬性 > - name: 當前程序例項別名, 預設為Process-N, N為從1開始遞增的整 > 數 > - pid: 當前程序例項的ID值 ##### 建立程序的兩種方法 ```python # 建立程序的方法一: # 利用multiprocessing模組提供一個Process類來建立一個程序物件 from multiprocessing import Process import time def func(n): while n > 0: print(n) time.sleep(3) n -= 1 if __name__ == "__main__": p = Process(target = func, args=(5,)) p.start() p.join() ``` ```python # 建立程序的方法二: # 建立新的程序可以自定義一個類去繼承Process類,每次例項化這個類的時候,就等同於例項化一個程序物件 import multiprocessing import time class ClockProcess(multiprocessing.Process): def run(self): n = 5 while n > 0: print(n) time.sleep(3) n -= 1 if __name__ == "__main__": p = ClockProcess() p.start() p.join() ``` **** #### 守護程序 > - 守護 **主程序** 時,如果主程序執行結束了,意味著守護程序的壽命立刻終止.立刻殺死 > - 語法: > - **程序物件.daemon = True** 設定**當前程序**為守護程序 > - **必須寫在start( )呼叫程序之前進行設定** > - 預設情況下,**主程序會等待所有子程序執行完畢之後,關閉程式,釋放資源**。若不等待,子程序並不方便管理,容易造成殭屍程序,在後臺不停的佔用系統的資源(cpu和記憶體),不清楚程序的來源。 > - 守護主程序即在主程序程式碼執行結束之後,無需等待子程序執行,立即殺死程式 > > ```python > import multiprocessing > > > def func(): > print("start 當前子程序") > print("end 當前子程序") > > > if __name__ == "__main__": > p = multiprocessing.Process(target=func) > p.daemon = True > p.start() > print("主程序執行結束 ... ") > > # 主程序執行結束 ... > > > ``` > **多個子程序下,未守護主程序,主程序仍會等待子程序執行結束** > > - **守護程序的實際用途:監控報活** > > ``` > import time > > > # 監控報活 > def alive(): > while True: > print("給監控伺服器發訊息, 當前5號伺服器功能正常 i am ok ~") > time.sleep(1) > > > # 當前伺服器正常完成的功能 > def func(): > time.sleep(5) > print("當前5號伺服器功能,統計財務報表~") > > > if __name__ == "__main__": > p1 = Process(target=func) > p2 = Process(target=alive) > > # 守護p2程序 > p2.daemon = True > > p1.start() > p2.start() > > # 等待p1子程序執行結束之後,下面的主程式的程式碼才會放行; > p1.join() > > # 未守護主程序,主程序會預設等待 > print("當前伺服器狀態:統計財務報表功能異常.....") > > > # 給監控伺服器發訊息, 當前5號伺服器功能正常 i am ok ~ > # 給監控伺服器發訊息, 當前5號伺服器功能正常 i am ok ~ > # 給監控伺服器發訊息, 當前5號伺服器功能正常 i am ok ~ > # 給監控伺服器發訊息, 當前5號伺服器功能正常 i am ok ~ > # 給監控伺服器發訊息, 當前5號伺服器功能正常 i am ok ~ > # 當前5號伺服器功能,統計財務報表~ > # 給監控伺服器發訊息, 當前5號伺服器功能正常 i am ok ~ > # 當前伺服器狀態:統計財務報表功能異常..... > ``` **** #### 多工處理方式一:多程序 ##### 建立多程序的兩種方式: ```python # 手動建立 from multiprocessing import Process num = 1 def run1(): global num num += 5 print("子程序1執行中,num = %d" % (num)) def run2(): global num num += 10 print("子程序2執行中,num = %d" % (num)) if __name__ == "__main__": print("父程序啟動") p1 = Process(target=run1) p2 = Process(target=run2) print("子程序將要執行") p1.start() p2.start() p1.join() p2.join() print("子程序結束") ``` ```python # 藉助舊版程序池建立多程序 from multiprocessing import Pool import random import time def work(num): print(random.random() * num) time.sleep(3) if __name__ == "__main__": # 例項化程序池物件,設定同一時間內最多可以執行的程序數為3個 # 題中的10個任務都由程序池中的這三個程序輪詢執行,不會建立額外 的程序數 # 若不指定則同一時間內可以執行的程序個數預設為cpu邏輯核心數 p = Pool(3) for i in range(10): # apply_async 選擇要呼叫的任務,每次迴圈出來的任務會用閒下來的子程序去執行 # 使⽤⾮阻塞⽅式調⽤func(並⾏執⾏,阻塞⽅式必須為等待上⼀個程序退出後才能執⾏下⼀個程序), args為傳遞給func的引數列表,kwargs為傳遞給func的關鍵字引數列表; p.apply_async(work, (i,)) # 程序池關閉之後不會再接受新的請求 p.close() # 等待程序池中的所有子程序都結束 p.join() # 多程序中,主程序一般用來等待子程序執行完畢,真正的任務都由子程序中執行 ``` ```python # 藉助新版程序池建立多程序 from concurrent.futures import ProcessPoolExecutor import os import time def func(i): print("任務執行中... start", os.getpid()) time.sleep(10) print("任務結束... end", i) return i # ProcessPoolExecutor 程序池基本使用 """ 預設如果一個程序短時間內可以完成更多的任務,就不會建立額外的新的程序,以節省資源 """ if __name__ == "__main__": lst = [] print(os.cpu_count()) # cpu邏輯核心數 # 建立程序池物件 """程序池中預設最多建立cpu這麼多個程序,所有任務全由這幾個程序完成,不會額外建立程序""" p = ProcessPoolExecutor() # 非同步提交任務 for i in range(10): res = p.submit(func, i) lst.append(res) # 獲取當前程序池返回值 for i in lst: print(i.result()) # 等待所有子程序執行結束 p.shutdown() # join print("主程式執行結束....") ``` **** #### 程序間通訊 **程序間資料不共享,他們之間進行資料傳遞即為通訊** `from multiprocessing import Queue` > 藉助程序佇列`Queue`完成程序間的通訊 ##### Queue 的基本使用 > - 訊息佇列遵循 **先進先出** 的原則 ###### 初始化 > - 初始化Queue()物件時(q=Queue()),若**括號中沒有指定最⼤可接收 > 的訊息數量**, 或**數量為負值**, 那麼就代表可接受的訊息數量沒有上限 ###### 入隊操作(存資料) > - `q = Queue()` > - `q.put(item, [block[, timeout]])`:**將item訊息寫⼊佇列** > - block 預設值為True > - 如果block 使⽤預設值,且沒有設定timeout(單位秒)時,若訊息列隊已經沒有空間可寫⼊,此時程式將被阻塞(停在寫⼊狀態) ,直到從訊息列隊騰出空間為⽌,如果設定了True和timeout,則會等待timeout秒,若還沒空間,則拋 出"q.Full"的異常資訊 > - 如果block值為False, 訊息列隊如果出現沒有空間可寫⼊的情況, 則會⽴刻丟擲"q.Full"**滿了**異常 > - `q.put_nowait(item)`: 相當`q.put(item, False)`; ###### 出隊操作(取資料) >- `q.get([block[, timeout]])`:獲取佇列中的⼀條訊息, 然後將其從列隊中移除 >- block預設值為True > - 如果block使⽤預設值,且沒有設定timeout(單位秒),訊息列隊如果為空, 此時程式將被阻塞(停在讀取狀態),直到從訊息列隊讀到訊息為⽌, > - 如果設定了timeout, 則會等待timeout秒, 若還沒讀取到任何訊息, 則拋 > 出"q.Empty"異常 > >- 如果block值為False,訊息列隊如果為空,則會⽴刻丟擲“q.Empty”**空的**異常 > >- `q.get_nowait()`:相當q.get(False) ###### 其他操作 > - q = Queue() > - q.qsize(): 返回當前佇列包含的訊息數量 > - q.empty(): 如果佇列為空, 返回True, 反之False > - q.full(): 如果佇列滿了, 返回True,反之False ##### python程式碼實現 ```python from multiprocessing import Queue, Process import time def write(q): for value in ["a", "b", "c"]: print("開始寫入:", value) q.put(value) time.sleep(2) def read(q): while True: if not q.empty(): print("讀取到的是", q.get()) time.sleep(2) else: break if __name__ == "__main__": q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pw.join() #等待接收完畢 pr.start() pr.join() print("接受完畢!") ``` ```python # 三個程序間通訊 from multiprocessing import Process from multiprocessing import Queue def func1(q1): q1.put("你好!") print(f"子程序p1往佇列q1中放入的資料為:你好!") def func2(q1, q2): msg = q1.get() print(f"子程序p2從佇列q1中取出的資料為:{msg}") q2.put(msg) print(f"子程序p2往佇列q2中放入的資料為:{msg}") def func3(q2): msg = q2.get() print(f"子程序p3從佇列q2中取出的資料為:{msg}") if __name__ == "__main__": q1 = Queue() q2 = Queue() p1 = Process(target=func1, args=(q1,)) p2 = Process(target=func2, args=(q1, q2)) p3 = Process(target=func3, args=(q2,)) p1.start() p2.start() p3.start() ``` ##### JoinableQueue 的用法 ```python # put 儲存 # get 獲取 # task_done 佇列計數減1 # join 阻塞 # task_done 配合 join 一起使用 # [1,2,3,4,5] # 佇列計數5 # put 一次 每存放一個值,佇列計數器加1 # get 一次 通過task_done讓佇列計數器減1 # join 函式,會根據佇列中的計數器來判定是阻塞還是放行 # 如果計數器變數是0,意味著放行,其他情況阻塞; from multiprocessing import Process,JoinableQueue jq = JoinableQueue() # put 會讓佇列計數器加1 jq.put("a") print(jq.get()) # 通過task_done,讓佇列計數器減1 jq.task_done() # 只有佇列計數器是0的時,才會放行 jq.join() # 佇列.join print("finish") ``` ##### 生產者——消費者模型 ###### Queue下的生產者——消費者模型: ```python # 消費者模型 def consumer(q, name): while True: food = q.get() if food is None: break time.sleep(random.uniform(0.1, 1)) print("%s 吃了一個%s" % (name, food)) # 生產者模型 def producer(q, name, food): for i in range(5): time.sleep(random.uniform(0.1, 1)) print("%s 生產了 %s%s" % (name, food, i)) q.put(food + str(i)) if __name__ == "__main__": q = Queue() # 消費者1 p1 = Process(target=consumer, args=(q, "張三")) p1.start() # 消費者2 a2 = Process(target=consumer, args=(q, "李四")) a2.start() # 生產者1 p2 = Process(target=producer, args=(q, "王五", "黃金")) p2.start() # 生產者2 b2 = Process(target=producer, args=(q, "小明", "鑽石")) b2.start() # 在生產完所有的資料之後,在佇列的末尾塞入一個None p2.join() b2.join() # 消費者模型如果獲取的是None,代表停止消費 q.put(None) q.put(None) ``` ###### JoinableQueue 下的生產者——消費者模型: ```python from multiprocessing import Process,JoinableQueue # 消費者模型 def consumer(q, name): while True: food = q.get() time.sleep(random.uniform(0.1, 1)) print("%s 吃了一個%s" % (name, food)) q.task_done() # 生產者模型 def producer(q, name, food): for i in range(5): time.sleep(random.uniform(0.1, 1)) print("%s 生產了 %s%s" % (name, food, i)) q.put(food + str(i)) if __name__ == "__main__": q = JoinableQueue() # 消費者1 p1 = Process(target=consumer, args=(q, "張三")) p1.daemon = True p1.start() # 生產者1 p2 = Process(target=producer, args=(q, "李四", "黃金")) p2.start() # 把生產者所有的資料都裝載到佇列中 p2.join() # 當佇列計數器減到0的時候,會立刻放行 # 必須等待消費者模型中所有的資料都task_done之後,變成0了就代表消費結束. q.join() print("程式結束....") ``` ###### 程序池中的程序之間的通訊 ``` from multiprocessing import Manager, Pool import time def write(q): for i in "welcome": print("開始寫入", i) q.put(i) def read(q): time.sleep(2) for i in range(q.qsize()): # q.qsize()獲取到當前佇列的訊息數量! print("得到訊息", q.get()) if __name__ == "__main__": print("主程序啟動!") q = Manager().Queue() po = Pool() po.apply_async(write, (q,)) po.apply_async(read, (q,)) po.close() po.joi