Python多程序程式設計及多程序間的通訊,資料傳輸
多程序程式設計及程序間的通訊
- 意義:充分利用計算機的資源提高程式的運算速率
- 定義:通過應用程式利用計算機多個核心達到同時執行多個任務的目的,以此提高計算機的執行速率
- 實施方案:多程序 多執行緒
- 並行: 計算機同時處理多個任務
- 併發:同時處理多個任務,核心在不斷的任務間小虎切換,達到好像還都在處理執行的效果,但是實際是一個時間點核心只能處理其中一個任務
多程序的優缺點
-
優點
- 可以使用計算機多核,進行任務的併發執行,提高執行效率
- 空間獨立,資料安全
- 執行不受其他程序影響,建立方便
-
缺點
- 程序的刪除和建立消耗的系統資源多
程序(process)
檢視程序:ps -aux
檢視程序樹:pstree
ps -ajx
標誌 | 名稱 | 說明 |
---|---|---|
S | 等待態 | 可中斷等待 |
D | 等待態 | 不可中斷等待 |
T | 等待態 | 暫停狀態 |
R | 執行態 | 包含就緒狀態 |
Z | 殭屍程序 | |
< | 高優先順序 | |
N | 優先順序較低 | |
l | 有子程序的 | |
s | 會話組組長 | |
+ | 前臺程序 |
三態
- 就緒態:程序具備執行條件,等待系統分配資源
- 執行態:程序佔有CPU,處於執行狀態
- 等待態:程序暫時不具備執行條件,阻塞等待,滿足條件後再執行
五態(三態的基礎上增加了新建態和終止態)
- 新建態:建立一個新的程序,獲取資源的過程
- 終止態:程序執行結束,資源釋放回收的過程
程序優先順序
-
作用:決定了一個程序的執行許可權
-
動態檢視系統中的程序資訊:
top
,用< , >
翻頁- 取值範圍:-20 -- 19 -20優先順序最高
-
使用指定的優先順序執行程式
-
nice
: 指定執行的優先順序e.g. :
nice -9 ./while.py
--->>以優先順序-9執行
-
程序特徵
- 程序之間執行互不影響,各自獨立執行
- 程序是作業系統資源分配的最小單位
- 每個程序空間獨立,各自佔有一定的虛擬記憶體
孤兒程序
- 父程序先於子程序退出,此時子程序就稱為孤兒程序
- 孤兒程序會被作業系統指定的程序收養,系統程序就成為了孤兒程序的父程序
殭屍程序
- 子程序先於父程序退出,但是父程序沒有處理子程序的退出狀態,此時子程序就會成為殭屍程序
- 殭屍程序會存留少量的PCB資訊在記憶體中,大量的殭屍程序會消耗系統資源,應該避免殭屍程序的產生
-
如何避免殭屍程序的產生
-
處理子程序的退出狀態
pid, status = os.wait()
功能:在父程序中堵塞等待子程序退出
返回值:
-
要求理解
- 什麼是程序?
- 瞭解程序特徵
- 清楚程序每種狀態,
多程序程式設計
1 2 3 # 功能 : 建立新的程序 4 # 引數 : 無 5 # 返回值 : 失敗--->返回一個負數 6 # 成功--->在新的程序中返回 ‘0’ 7 # 在原有的程序中返回新的程序的PID號 8 import os 9 pid = os.fork() 10 pid = os.fork() 11 print(pid) 12 13 '''列印 14 9352 15 0 16 9353 17 '''
- 子程序會複製父程序的全部程式碼,包括fork之前產生的記憶體空間
- 子程序從fork的下一句開始執行,與父程序互不干擾
- 父子程序的執行順序不一定,父子程序公用一個終端顯示
- 父子程序通常會根據fork返回值的差異選擇執行不同的程式碼。所以if結構幾乎是fork的標配
- 父子程序空間獨立,操作都是本空間的內容,互不影響
- 子程序也有自己的特性,比如PID號,PCB,命令集等。
程序相關的函式
函式方法 | 引數 | 說明 |
---|---|---|
os.getpid() | 返回當前程序的PID號 | |
os.getppid() | 返回當前程序的父程序的PID號 | |
os._exit( status ) | 程式的退出狀態 | 程序退出 |
sys.exit( [ status ] ) | 數字:表示退出狀態,不寫預設為 | 程序退出 |
import os pid = os.fork() if pid < 0: print("建立程序失敗") elif pid == 0: print("子程序我的真實PID為:",os.getpid(),"我的父程序PID為:",os.getppid()) else: print("我是父程序執行的程式碼,當前的變數pid為:",pid,"我的真實PID為:",os.getpid()) '''列印內容'''''' 我是父程序執行的程式碼,當前的變數pid為: 10992 我的真實PID為: 10991 子程序我的真實PID為: 10992 我的父程序PID為: 10991 ''''''列印內容''' # 如果pid 10992和子程序真實PID不同,那麼這個子程序就變成了孤兒程序
多程序模組
import multiprocessing
from multiprocessing import Process
Process()
-
功能:建立一個程序物件
-
引數
- name:程序名稱
- target:繫結函式
- args:元組,給target函式按照位置傳參
- kwargs:字典,給target函式按照鍵值對傳參
- name: 字串,新的程序的名字
- 例如:p = Process(target = fun,args=(a,b))
函式方法 | 說明 |
---|---|
p.start() | 啟動程序,target函式自動執行,此時程序被真正建立 |
p.join([timeout]) | 阻塞等待回收子程序,timeout為超時時間 |
p.is_alive() | 判斷程序生命週期狀態,處於生命週期,返回布林值 |
p.name() | 獲取程序名稱 |
p.pid() | 獲取程序 的pid |
p.daemon() | 預設狀態False,主程序退出不影響子程序。True :子程序隨著主程序結束 |
-
使用multiprocessing建立子程序,同樣子程序複製父程序的全部程式碼,父子程序各自執行互不影響,父子程序有各自的執行空間
-
如果不使用join揮手子程序則子程序退出後會成為殭屍程序
from multiprocessing import Process from time import sleep #帶引數的程序函式 def worker(sec,name): for i in range(3): sleep(sec) print("I'm %s"%name) print("I'm working...") p = Process(target = worker,args = (2,),\ kwargs = {'name':'Daivl'},name = "Worker") p.start() print("Process name:",p.name) #程序名稱 print("Process PID:",p.pid) #獲取程序PID #程序alive情況 print("Process is alive:",p.is_alive()) p.join(3) print("==================")
建立自定義繼承Process類
-
繼承Process
-
編寫自己的
__init__
,同時載入父類的__init__
方法 -
重寫
run
方法,可以通過生成的物件呼叫start自動執行
from import Process import time class ClockProcess(Process): def __init__(self,value): self.value = value super(ClockProcess,self).__init__() def run(self): for i in range(5): print("現在的時間是",time.ctime()) time.sleep(self.value) # 建立自定義進的類的物件 p =ClockProcess(2) # 自動呼叫run p.start() p.join()
程序池技術
-
產生原因
- 如果有大量任務需要多程序完成。則需要頻繁的建立刪除程序,給計算機帶來較多的資源消耗
-
原理
- 建立適當的程序放入程序池,用來池裡待處理的時間,處理完當前任務後進程不銷燬,仍然哎程序池等待處理其他時間,程序的複用降低了系統資源的消耗
-
使用方法
- 建立程序池,在池內放入適當的程序
- 將事件加入到事件等待佇列
- 不斷取程序執行時間,直到所有程序執行完畢
- 關閉程序池,回收程序
Pool函式
-
Pool(Processes)
- 功能:建立程序池物件
- 表示程序池中有多少程序
-
pool.apply_async(func, args, kwds)
-
功能:將時間放入程序池佇列
-
引數
- func:事件函式
- args:以元組形式給func傳參
- kwds : 以字典形式給func傳參
-
返回值一個物件
-
-
pool.apply(func, args, kwds)
-
功能:將時間放入程序池佇列
-
引數
- func:事件函式
- args:以元組形式給func傳參
- kwds : 以字典形式給func傳參
-
-
pool.close()
- 功能:關閉程序池
-
pool.join()
- 功能:回收程序池
-
pool.map(func,iter)
-
功能:將要做的事件放入程序池
-
引數
- func : 要執行的函式
- iter : 迭代物件
-
返回值:多次return的結果列
-
from multiprocessing import Process import time class ClockProcess(Process): def __init__(self,value): self.value = value super(ClockProcess,self).__init__() def run(self): for i in range(5): print("現在的時間是",time.ctime()) time.sleep(self.value) # 建立自定義進的類的物件 p =ClockProcess(2) # 自動呼叫run p.start() p.join()
程序間的通訊(IPC)
程序間通訊的方法有:管道,訊息佇列,共享記憶體,訊號,訊號量,套接字
管道 | 訊息佇列 | 共享記憶體 | |
---|---|---|---|
開闢空間 | 記憶體 | 記憶體 | 記憶體 |
讀寫方式 | 兩端讀寫[雙向/單向] | 先進先出 | 覆蓋之前的內容 |
效率 | 一般 | 一般 | 較高 |
應用 | 多用於父子程序 | 廣泛靈活 | 需要注意互斥 |
管道通訊
通訊原理:在記憶體中開闢管道空間,生成管道操作物件,多個程序使用“同一個”管道物件進行操作,即可實現通訊
multiprocessing --> Pipe
-
fd1,fd2 = Pipe(duplex = True)
-
功能:建立管道
-
引數
- 預設表示為雙向管道
- 設定False則為單向管道
-
返回值
- 如果是雙向管道則都可以讀寫
- 如果是單向管道,則fd1只讀,fd2只寫
-
-
fd.recv()
- 功能:從管道讀取資訊
- 返回值:讀取到的內容
- 如果管道為空則堵塞
-
fd.send(data)
- 功能:向管道中寫入內容
- 引數:要寫入的內容
- 可以傳送任意Python資料型別
多程序管道傳輸資料示例
from multiprocessing import Process,Pipe import time,os # 建立管道物件 fd1, fd2 = Pipe() def fun(name): time.sleep(1) fd2.send(os.getppid()) jobs = [] # 建立5個子程序 for i in range(5): p = Process(target = fun,args = (i,)) jobs.append(p) p.start() for i in range(5): data = fd1.recv() print(data) for i in jobs: i.join()
訊息佇列
佇列:先進先出,按照順序來
通訊原理:在記憶體中建立佇列資料結構模型。多個程序都可以通過佇列存入內容,取出內容的順序和存入的順序保持一致
-
建立佇列
q = Queue(maxsize = 0)
- 功能:建立佇列訊息
- 引數:表示最多儲存多少訊息。預設表示根據記憶體分配儲存
- 返回值:佇列物件
-
q.put(data, [block, timeout])
-
功能:像佇列儲存訊息
-
引數
- data:要存的內容
- block:預設佇列滿時候會堵塞,設定False則非堵塞
- timeout:超時時間
-
-
data = q.get([block, timeout])
-
功能:獲取佇列訊息
-
引數
- block:預設設定佇列空時會堵塞,設定為False則非堵塞
- timeout:超時時間
-
返回值:返回取出的內容
-
-
q.full()
:判斷佇列是否為滿 -
q.empty()
:判斷佇列是否為空 -
q.size()
:判斷佇列中的訊息數量 -
q.close
:關閉佇列
單程序示例
#!《 單程序 》 from multiprocessing import Queue from time import sleep # 建立佇列,可以放3條訊息 q = Queue(3) # 存一條訊息 q.put(1) sleep(0.5) # 判斷佇列是否為空 print(q.empty()) q.put(2) # 判斷佇列是否滿 print(q.full()) q.put(3) # 輸出佇列訊息數量 print(q.qsize()) # 輸出 print(q.get()) q.close()
多程序訊息佇列傳遞資料
from multiprocessing import Queue,Process from time import sleep # 建立佇列,可以放3條訊息 q = Queue(3) def fun1(): sleep(1) q.put({"a":1,"b":2}) def fun2(): sleep(2) print("收到訊息",q.get()) p1 = Process(target = fun1) p2 = Process(target = fun2) p1.start() p2.start() p1.join() p2.join()
共享記憶體
通訊原理:在記憶體中開闢一段空間,對多個程序可見,程序可以寫入可以讀,但是每次寫入的內容會覆蓋上一次的內容。
只能進行單個數據的傳送
-
obj = Value(ctype, obj)
-
功能:開闢共享記憶體空間
-
引數
- ctype:要儲存的資料型別
- obj:共享記憶體的初始資料
-
返回值:共享記憶體物件
-
obj.value
即共享記憶體值,對其修改即可修改記憶體 -
from multiprocessing import Value from time import sleep import os # 建立共享記憶體物件 money = Value('i',2000) # 操作共享記憶體 def deposite(): while True: i = int(input("請輸入:")) money.value = i sleep(0.05) def withdraw(): data = money.value while True: if data != money.value : data = money.value print(data) pid = os.fork() if pid == 0 : deposite() withdraw()
-
-
obj = Array(ctype, obj)
-
功能:開闢共享記憶體空間
-
引數:
- ctype:要存入的資料格式
- obj:初始化存入的內容,比如列表、字串。如果是數字則代表開闢記憶體空間的個數
-
返回值:返回共享記憶體物件,可以遍歷
-
from multiprocessing import Array,Process from time import sleep import os # 開闢100字元記憶體空間,'c'代表字元,'i'代表整形 shm = Array('c',100) # 必須使用位元組流 shm.value = "哈哈哈".encode() def fun1(): print(os.getpid(),"子程序1:",shm.value.decode()) shm.value = "夜夜夜".encode() def fun2(): sleep(1) print(os.getpid(), "子程序2:",shm.value.decode()) p1 = Process(target = fun1) p2 = Process(target = fun2) p1.start() p2.start() p1.join() p2.join()
-
訊號通訊
一個程序向另一個程序傳送一個訊號來傳遞某種資訊,接受者根據傳遞的資訊來做相應的事
$ kill -l
檢視系統訊號說明
$ kill -9 pid號
對程序傳送訊號
訊號名稱 | 說明 |
---|---|
1) SIGHUP | 連線斷開 |
2) SIGINT | ctrl+c |
3) SIGQUIT | ctrl+\ |
20) SIGTSTP | ctrl+z |
9) SIGKILL | 終止程序 |
19) SIGSTOP | 暫停程序 |
26) SIGVTALRM | 時鐘訊號 |
17) SIGCHLD | 子程序退出時給父程序發的訊號 |
在Python中import signal
可以獲取訊號
-
os.kill(pid, sig)
-
功能:傳送訊號
-
引數
- pid:要傳送訊號的PID號
- sig :訊號名稱
-
import os import signal os.kill(12345,signal.SIGKILL) #殺死程序