1. 程式人生 > >python 使用多程序實現併發程式設計/使用queue進行程序間資料交換

python 使用多程序實現併發程式設計/使用queue進行程序間資料交換

import time
import os
import multiprocessing

from multiprocessing import Queue, pool


"""
一.Python 使用多程序實現併發程式設計:
    因為cpython直譯器中有GIL存在的原因(每個程序都會維護一個GIL,jpython直譯器沒有這個問題),所以在一個程序內,
    即使伺服器是多核cpu,同一時刻只能有一個執行緒在執行任務(一個程序內)。如果存在較多IO,使用多執行緒是可以提高處理速度的,
    但是如果是cpu密集型任務,使用多執行緒是有一定瓶頸的。如果需要頻繁的建立和銷燬任務,可以使用多執行緒。cpu密集型任務可以考慮使用多程序。
    
二.multiprocessing模組就是跨平臺版本的多程序模組,提供了一個Process類來代表一個程序物件,這個物件可以理解為是一個獨立的程序,可以執行另外的事情.Process語法結構如下:
    Process([group [, target [, name [, args [, kwargs]]]]])
    
    target:如果傳遞了函式的引用,可以任務這個子程序就執行這裡的程式碼
    args:給target指定的函式傳遞的引數,以元組的方式傳遞
    kwargs:給target指定的函式傳遞命名引數
    name:給程序設定一個名字,可以不設定
    group:指定程序組,大多數情況下用不到
    Process建立的例項物件的常用方法:
    
    start():啟動子程序例項(建立子程序)
    is_alive():判斷程序子程序是否還在活著
    join([timeout]):是否等待子程序執行結束,或等待多少秒
    terminate():不管任務是否完成,立即終止子程序
    Process建立的例項物件的常用屬性:
    
    name:當前程序的別名,預設為Process-N,N為從1開始遞增的整數
    pid:當前程序的pid(程序號)
    
三.程序間無法直接共享記憶體,所以需要使用Queue進行資料的互動:
初始化Queue()物件時(例如:q=Queue()),若括號中沒有指定最大可接收的訊息數量,或數量為負值,那麼就代表可接受的訊息數量沒有上限(直到記憶體的盡頭); Queue.qsize():返回當前佇列包含的訊息數量; Queue.empty():如果佇列為空,返回True,反之False ; Queue.full():如果佇列滿了,返回True,反之False; Queue.get([block[, timeout]]):獲取佇列中的一條訊息,然後將其從列隊中移除,block預設值為True; 1)如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果為空,此時程式將被阻塞(停在讀取狀態),直到從訊息列隊讀到訊息為止,如果設定了timeout,則會等待timeout秒,若還沒讀取到任何訊息,則丟擲"Queue.Empty"異常; 2)如果block值為False,訊息列隊如果為空,則會立刻丟擲"Queue.Empty"異常; Queue.get_nowait():相當Queue.get(False); Queue.put(item,[block[, timeout]]):將item訊息寫入佇列,block預設值為True; 1)如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果已經沒有空間可寫入,此時程式將被阻塞(停在寫入狀態),直到從訊息列隊騰出空間為止,如果設定了timeout,則會等待timeout秒,若還沒空間,則丟擲"Queue.Full"異常; 2)如果block值為False,訊息列隊如果沒有空間可寫入,則會立刻丟擲"Queue.Full"異常; Queue.put_nowait(item):相當Queue.put(item, False); 注意: 多個程序同時對一個佇列進行讀取資料時,有可能會發生資料丟失的情況
""" def multi_1():
  # 測試主程序預設會等待子程序 time.sleep(
2) print("multi_1 子程序執行結束") def multi_2(pid, name="lowman"):
  # 測試向子程序的任務函式中傳遞引數 time.sleep(
2) print("主程序的pid:{}".format(pid)) # print(name) def test_queue_write(queue):
  # 往佇列中寫入資料 my_list
= ["L", "O", "W", "M", "A", "N"]
for i in my_list: # 判斷佇列是否已滿,這樣設定的話,需要注意資料量的大小和佇列的容量,否則迴圈寫入資料的時候會造成的資料的丟失 # if not queue.full(): # queue.put(i) # 如果佇列已滿,就會阻塞 queue.put(i) print("test_queue_write子程序:所有資料都已寫入了佇列中") def test_queue_read(queue):
  # 從佇列中取出資料
while True: # 判斷佇列是否為空
if not queue.empty(): print(queue.get()) else: break def test_multi(): # 建立一個子程序,並將需要執行的目標函式使用target引數進行傳遞 p = multiprocessing.Process(target=multi_1) # args:給固定引數傳參,最後必須帶一個逗號 "," kwargs:給命名引數傳參 p_2 = multiprocessing.Process(target=multi_2, args=(os.getpid(), ), kwargs={"name": "xienuo"}) # p 子程序 守護主程序,當主程序執行完成以後,主程序不會等待 p 子程序 # p.daemon = True # 啟動子程序,預設情況主程序會守護所有子程序,等所有子程序執行結束後,才會退出 p.start() p_2.start() # 主程序執行完畢,直接銷燬 p 子程序,不管子程序是否已執行完畢,和daemon具有相似的功能 # p.terminate() # 等待 p 子程序結束後再執行下一步 # p.join() # 等待 p_2 子程序結束後再執行下一步 # p_2.join() # 父程序建立Queue,並傳給各個子程序,可以設定佇列的上限,不設定則表示沒有上限 q = Queue() pw = multiprocessing.Process(target=test_queue_write, args=(q,)) pr = multiprocessing.Process(target=test_queue_read, args=(q,)) pw.start() # 這裡設定成了讓寫操作先完成, 再進行讀操作(阻塞),這樣設定的話其實就相當於是同步執行,影響了效能 pw.join() pr.start() print("預設情況下:主程序執行結束,會等待子程序") test_multi()

Python程序池的使用請點選:https://www.cnblogs.com/lowmanisbusy/p/10259235.html