1. 程式人生 > >python3多程序多種使用情況下詳解

python3多程序多種使用情況下詳解

序言

最近在寫一個專案,需要使用到多程序。由於整個網路伺服器由自己開發,並沒有使用模組,所以在多程序上面越用越多的疑惑。經過一系列的測試,對整個python多程序有了更多的認識。

最大體會

每當使用multiprocessing建立新的程序,會複製主程序的所有狀態和引數。所以此時呼叫主程序的全域性變數是可以的,但是修改之後不會上傳會主程序。
**

Process程序建立時,子程序會將主程序的Process物件完全複製一份。

**
在新的主程序類裡面建立新類的時候,如果要使用全域性變數,則需要將主程序一個變數賦值為全域性變數繫結地址之後,再將這個主程序的變數賦值給新類,就可以全域性使用了。但是如果沒有這麼做,name就不會得到全域性變數。子類修改值不會改到主類上面去。

有關linux程序的操作指令

cpu時間片
如果一個程序佔有計算機核心,我們稱為改程序佔有計算機cpu時間片。

  • 多個任務之間是爭奪cpu的關係
  • 誰佔有cpu最終是作業系統決定

PCB (程序控制塊)
在記憶體中開闢的一塊空間,用來記錄程序的資訊,程序控制塊是作業系統查詢識別程序的標誌。

程序資訊 : ps -aux
PID(process ID) : 在作業系統中每個程序都有一個唯一的ID號用來區別於其他程序。ID號由作業系統自動分配,是一個大於0的整數

父子程序 : 在系統中除了初始化程序,每一個程序都有一個父程序,可能有0個或者多個子程序。由此形成父子程序關係。

檢視程序樹 : pstree
檢視父程序PID: ps -ajx
ps -aux —> STAT
S 等待態 (可中斷等待)
D 等待態 (不可中斷等待)
T 等待態 (暫停狀態)
R 執行態 (包含就緒態)
Z 殭屍程序

< 高優先順序程序
N 優先順序較低
l 有子程序的
s 會話組組長

  • 前臺程序

程序優先順序
檢視程序優先順序
top 動態檢視系統中的程序資訊, 用<>翻頁
取值範圍 -20 – 19 -20優先順序最高

使用指定的優先順序執行程式
nice : 指定執行的優先順序
e.g. nice -9 ./while.py 以優先順序9執行
nice --9 ./while.py 以-9優先順序執行

程序特徵

  1. 程序之間執行互不影響 各自獨立執行
  2. 程序是作業系統資源分配的最小單位
  3. 每個程序空間獨立,各自佔有一定的虛擬記憶體

fork

fork是一種只能在linux系統使用的基礎的多程序操作,操作比較簡單,但是不易管理。在只建立單個多程序時很方便,多次建立程序會導致程序難以管理。優點是高效率,缺點是難操作。目前想在高併發情況下可能需要使用

import os 
from time import sleep

print("*******************")
a = 1

pid = os.fork()

if pid < 0:
    print("建立程序失敗")
elif pid == 0:
    print("這是新的程序,新的程序操作在這裡進行。程序結束之後一定要回收程序!!!")
    print("a = ",a)
    a = 10000
else:
    sleep(1)
    print("這是原有程序,原有的程序在這裡繼續")
    print("parent a =",a)

print("演示完畢") 

獲取程序PID
os.getpid()
功能 : 獲取當前程序的程序號
返回值 : 返回程序號

os.getppid()
功能 : 獲取當前程序父程序的PID號
返回值 : 返回程序號

程序退出

os._exit(status)
功能 : 程序退出
引數 : 程序的退出狀態

sys.exit([status])
功能 : 程序退出
引數 : 數字表示退出狀態,不寫預設為0
字串,表示退出時列印的內容

  • sys.exit 可以通過捕獲 SystemExit異常阻止退出

如何避免殭屍程序產生

處理子程序退出狀態
pid,status = os.wait()
功能 :在父程序中阻塞等待處理子程序退出
返回值: pid 退出的子程序的PID號
status 獲取子程序退出狀態

pid,status = os.waitpid(pid,option)
功能 :在父程序中阻塞等待處理子程序退出
引數 : pid -1 表示等待任意子程序退出
>0 表示等待對應PID號的子程序退出
option 0 表示阻塞等待
WNOHANG 表示非阻塞
返回值: pid 退出的子程序的PID號
status 獲取子程序退出狀態

waitpid(-1,0) ===> wait()

  • 讓父程序先退出
    1. 父程序建立子程序等待子程序退出
    2. 子程序建立二級子程序後立即退出
    3. 二級子程序稱為孤兒,和原來的父程序各自執行 事件

multiprocessing

可以在windows和linux上使用的程序方式。操作簡單,主要有兩種,繼承多程序類和直接使用程序方法。

1、使用程序方法:

import multiprocessing as ms
def test():
    pass
p1 = ms.Process(target=test) # 建立子程序
p1.start() # 子程序 開始執行
p1.join() # 等待子程序結束,linu
  • 使用multiprocessing建立子程序,同樣子程序複製父程序的全部程式碼段,父子程序各自執行互不影響,父子程序有各自的執行空間。
  • 當子程序執行完畢後,會產生一個殭屍程序,其會被join函式回收,或者再有一條程序開啟,start函式也會回收殭屍程序,所以不一定需要寫join函式。
  • windows系統在子程序結束後會立即自動清除子程序的Process物件,而linux系統子程序的Process物件,如果沒有join函式和start函式的話會在主程序結束後統一清除。

multiprocessing最大的優勢在於windows不需要進行程序回收,但是在linux仍然需要考慮回收程序(比如在伺服器上等長期執行的環境中),親測start函式也會回收殭屍程序,這個模組六翻了
繼承Process來建立程序類:

  1. 繼承Process
  2. 編寫自己的__init__ ,同時載入父類init方法
  3. 重寫run方法,可以通過生成的物件呼叫start自動執行
from multiprocessing import Process
import os
import time
class MyProcess(Process):
    #重新init方法
    def __init__(self,interval):
        #下面一句是呼叫父類init方法,這一本儘量不要少,因為父類還有很多事情需要在init方法內處理
        Process.__init__(self)
        self.interval=interval

    #重寫run方法
    def run(self):
        print("子程序執行中,pid=%d,父程序:%d" % (os.getpid(), os.getppid()))
        t_start=time.time()
        time.sleep(self.interval)
        t_end=time.time()
        print("子程序執行結束,耗時:%0.2f秒"%(t_end-t_start))

if __name__=="__main__":
    t_start=time.time()
    print("父程序開始執行")
    p=MyProcess(2)
    p.start()
    p.join()
    t_end=time.time()
    print("父程序執行結束,耗時:%0.2f秒" % (t_end - t_start))

Process其他方法:

p.is_alive() 判斷程序生命週期狀態,處於生命週期得到True否則返回False
p.name 程序名稱 預設為Process-1
p.pid 程序的PID號
p.daemon
預設狀態False 主程序退出不會影響子程序執行,如果設定為True 則子程序會隨著主程序結束而結束。

  • 要在start前設定
  • 一般不和join一起使用

程序池(Pool)

產生原因 : 如果有大量任務需要多程序完成,則可能需要頻繁的建立刪除程序,給進算計帶來較多的資源消耗。
原理 : 建立適當的程序放入程序池,用來處理待處理事件,處理完畢後進程不銷燬,仍然在程序池中等待處理其他事件。 程序的複用降低了資源的消耗

Pool(processes)
功能 : 建立程序池物件
引數 :表示程序池中有多少程序

pool.apply_async(func,args,kwds)
功能 : 將事件放入到程序池佇列
引數 : func 事件函式,args 以元組形式給func傳參,kwds 以字典形式給func傳參
返回值 : 返回一個代表程序池事件的物件

pool.apply(func,args,kwds)
功能 : 將事件放入到程序池佇列
引數 : func 事件函式,args 以元組形式給func傳參,kwds 以字典形式給func傳參

pool.close()
功能: 關閉程序池

pool.join()
功能:回收程序池

pool.map(func,iter)
功能: 將要做的時間放入程序池
引數: func 要執行的函式 iter 迭代物件
返回值 : 返回事件函式的返回值列表

from multiprocessing import Pool 
from time import sleep,ctime 

def worker(msg):
    sleep(2)
    print(msg)
    return ctime()

#建立程序池
pool = Pool(processes = 4)

result = []
for i in range(10):
    msg = "hello %d"%i
    #將事件放入程序池佇列,等待執行
    r = pool.apply_async(func = worker,args = (msg,))
    result.append(r)

#關閉程序池
pool.close()

#回收
pool.join()

for i in result:
    print(i.get())  #獲取事件函式的返回值

程序通訊

程序間通訊 (IPC)

原因 : 程序空間相對獨立,資源無法相互獲取,此時在不同程序間通訊需要專門方法。

程序間通訊方法 : 管道 訊息佇列 共享記憶體 訊號
訊號量 套接字

管道通訊 Pipe

通訊原理 : 在記憶體中開闢管道空間,生成管道操作物件,多個程序使用"同一個"管道物件進行操作即可實現通訊

multiprocessing —》 Pipe

fd1,fd2 = Pipe(duplex = True)
功能 : 建立管道
引數 : 預設表示雙向管道, 如果設定為False則為單向管道
返回值 : 表示管道的兩端,如果是雙向管道 都可以讀寫,如果是單向管道 則fd1只讀 fd2只寫

fd.recv()
功能 : 從管道讀取資訊,返回值: 讀取到的內容, 如果管道為空則阻塞

fd.send(data)
功能:向管道寫入內容,引數: 要寫入的內容,可以傳送python資料型別

訊息佇列
佇列 : 先進先出
通訊原理 : 在記憶體中建立佇列資料結構模型。多個程序都可以通過佇列存入內容,取出內容的順序和存入順序保持一致

建立佇列
q = Queue(maxsize = 0)
功能 : 建立訊息佇列
引數 : 表示最多存放多少訊息。預設表示根據記憶體分配存 儲
返回值 : 佇列物件

q.put(data,[block,timeout])
功能: 向佇列儲存訊息
引數 :data 要存的內容,block 預設佇列滿時會阻塞,設定為False則非阻塞,timeout 超時時間

data = q.get([block,timeout])
功能:獲取佇列訊息
引數:block 預設佇列空時會阻塞,設定為False則非阻塞, timeout 超時時間
返回值 : 返回取出的內容
q.full() 判斷佇列是否為滿
q.empty() 判斷佇列是否為空
q.qsize() 判斷佇列中訊息數量
q.close() 關閉佇列
共享記憶體
通訊原理:在記憶體空開闢一塊空間,對多個程序可見,程序可以寫入輸入,但是每次寫入的內容會覆蓋之前的內容。

obj = Value(ctype,obj)
功能 : 開闢共享記憶體空間
引數 : ctype 要儲存的資料型別,obj 共享記憶體的初始化資料
返回 :共享記憶體物件

obj.value 即為共享記憶體值,對其修改即修改共享記憶體

obj = Array(ctype,obj)
功能 : 開闢共享記憶體空間
引數 : ctype 要儲存的資料格式,obj 初始化存入的內容 比如列表,字串,如果是整數則表示開闢空間的個數
返回值 : 返回共享記憶體物件
* 可以通過遍歷過戶每個元素的值
e.g. [1,2,3] —> obj[1] == 2
* 如果存入的是字串
obj.value 表示字串的首地址

     管道         訊息佇列       共享記憶體

開闢空間 記憶體 記憶體 記憶體

讀寫方式 兩端讀寫 先進先出 覆蓋之前內容
雙向/單向
效率 一般 一般 較高
應用 多用於父 廣泛靈活 需要注意
子程序 進行互斥操作

訊號通訊
一個程序向另一個程序傳送一個訊號來傳遞某種訊息,接受者根據接收到的訊號進行相應的行為
kill -l 檢視系統訊號
kill -sig PID 向一個程序傳送訊號

關於訊號
訊號名稱 訊號含義 預設處理方法

SIGHUP 連線斷開
SIGINT CTRU-C
SIGQUIT CTRU-
SIGTSTP CTRL-Z
SIGKILL 終止一個程序
SIGSTOP 暫停一個程序
SIGALRM 時鐘訊號
SIGCHLD 子程序狀態改變時給父程序發出

python 傳送訊號
signal
os.kill(pid,sig)
功能: 傳送訊號
引數: pid 目標程序,sig 要傳送的訊號

import signal

signal.alarm(sec)
功能 : 向自身傳送時鐘訊號 --》 SIGALRM
引數 : sec 時鐘時間

  • 程序中只能有一個時鐘,第二個會覆蓋第一個時間

同步執行 : 按照順序逐句執行,一步完成再做下一步
非同步執行 : 在執行過程中利用核心記錄延遲發生或者準備 處理的事件。這樣不影響應用層的持續執行。 當事件發生時再由核心告知應用層處理

  • 訊號是唯一的非同步通訊方法

signal.pause()
功能:阻塞等待接收一個訊號

signal.signal(signum,handler)
功能: 處理訊號
引數: signum 要處理的訊號
handler 訊號的處理方法
SIG_DFL 表示使用預設的方法處理
SIG_IGN 表示忽略這個訊號
func 傳入一個函式表示用指定函式處理
def func(sig,frame)
sig: 捕獲到的訊號
frame : 訊號物件

訊號量(訊號燈)

原理 : 給定一個數量,對多個程序可見,且多個程序都可以操作。程序通過對數量多少的判斷執行各自的行為。
multiprocessing --》 Semaphore()
sem = Semaphore(num)
功能: 建立訊號量,引數: 訊號量初始值,返回: 訊號量物件
sem.get_value() 獲取訊號量值
sem.acquire() 將訊號量減1 當訊號量為0會阻塞
sem.release() 將訊號量加1

程序的同步互斥

臨界資源 :多個程序或者執行緒都能夠操作的共享資源
臨界區 : 操作臨界資源的程式碼段
同步 : 同步是一種合作關係,為完成某個任務,多程序或者多執行緒之間形成一種協調,按照約定或條件執行操作臨界資源。
互斥 : 互斥是一種制約關係,當一個程序或者執行緒使用臨界資源時進行上鎖處理,當另一個程序使用時會阻塞等待,直到解鎖後才能繼續使用。

Event 事件
multiprocessing --》 Event
建立事件物件
e = Event()
設定事件阻塞
e.wait([timeout])
事件設定 當事件被設定後e.wait()不再阻塞
e.set()

清除設定 當事件設定被clear後 e.wait又會阻塞
e.clear()
事件狀態判斷
e.is_set()

Lock 鎖
建立物件
lock = Lock()
lock.acquire() 上鎖 如果鎖已經是上鎖狀態呼叫此函式會阻塞
lock.release() 解鎖
with lock: 上鎖


解鎖