1. 程式人生 > >Python多程序程式設計及多程序間的通訊,資料傳輸

Python多程序程式設計及多程序間的通訊,資料傳輸

多程序程式設計及程序間的通訊

  • 意義:充分利用計算機的資源提高程式的運算速率
  • 定義:通過應用程式利用計算機多個核心達到同時執行多個任務的目的,以此提高計算機的執行速率
  • 實施方案:多程序 多執行緒
  • 並行: 計算機同時處理多個任務
  • 併發:同時處理多個任務,核心在不斷的任務間小虎切換,達到好像還都在處理執行的效果,但是實際是一個時間點核心只能處理其中一個任務

多程序的優缺點

  • 優點

    • 可以使用計算機多核,進行任務的併發執行,提高執行效率
    • 空間獨立,資料安全
    • 執行不受其他程序影響,建立方便
  • 缺點

    • 程序的刪除和建立消耗的系統資源多

程序(process)

檢視程序:ps -aux 檢視程序樹:pstree

檢視父程序:ps -ajx

標誌名稱說明
S 等待態 可中斷等待
D 等待態 不可中斷等待
T 等待態 暫停狀態
R 執行態 包含就緒狀態
Z 殭屍程序
< 高優先順序
N 優先順序較低
l 有子程序的
s 會話組組長
+ 前臺程序

三態

  • 就緒態:程序具備執行條件,等待系統分配資源
  • 執行態:程序佔有CPU,處於執行狀態
  • 等待態:程序暫時不具備執行條件,阻塞等待,滿足條件後再執行

五態(三態的基礎上增加了新建態和終止態)

  • 新建態:建立一個新的程序,獲取資源的過程
  • 終止態:程序執行結束,資源釋放回收的過程

程序優先順序

  • 作用:決定了一個程序的執行許可權

  • 動態檢視系統中的程序資訊:top,用< , >翻頁

    • 取值範圍:-20 -- 19 -20優先順序最高
  • 使用指定的優先順序執行程式

    • nice : 指定執行的優先順序

      e.g. :nice -9 ./while.py --->>以優先順序-9執行

程序特徵

  1. 程序之間執行互不影響,各自獨立執行
  2. 程序是作業系統資源分配的最小單位
  3. 每個程序空間獨立,各自佔有一定的虛擬記憶體

孤兒程序

  1. 父程序先於子程序退出,此時子程序就稱為孤兒程序
  2. 孤兒程序會被作業系統指定的程序收養,系統程序就成為了孤兒程序的父程序

殭屍程序

  1. 子程序先於父程序退出,但是父程序沒有處理子程序的退出狀態,此時子程序就會成為殭屍程序
  2. 殭屍程序會存留少量的PCB資訊在記憶體中,大量的殭屍程序會消耗系統資源,應該避免殭屍程序的產生
  • 如何避免殭屍程序的產生

    • 處理子程序的退出狀態

      pid, status = os.wait()

      功能:在父程序中堵塞等待子程序退出

      返回值:

要求理解

  1. 什麼是程序?
  2. 瞭解程序特徵
  3. 清楚程序每種狀態,

多程序程式設計

 1  
 2   
 3 # 功能 : 建立新的程序
 4 # 引數 : 無
 5 # 返回值 : 失敗--->返回一個負數
 6 #  成功--->在新的程序中返回 ‘0’
 7 # 在原有的程序中返回新的程序的PID號
 8 import os
 9 pid = os.fork()
10 pid = os.fork()
11 print(pid)
12 13 '''列印
14 9352
15 0
16 9353
17 '''
  • 子程序會複製父程序的全部程式碼,包括fork之前產生的記憶體空間
  • 子程序從fork的下一句開始執行,與父程序互不干擾
  • 父子程序的執行順序不一定,父子程序公用一個終端顯示
  • 父子程序通常會根據fork返回值的差異選擇執行不同的程式碼。所以if結構幾乎是fork的標配
  • 父子程序空間獨立,操作都是本空間的內容,互不影響
  • 子程序也有自己的特性,比如PID號,PCB,命令集等。

程序相關的函式

函式方法引數說明
os.getpid() 返回當前程序的PID號
os.getppid() 返回當前程序的父程序的PID號
os._exit( status ) 程式的退出狀態 程序退出
sys.exit( [ status ] ) 數字:表示退出狀態,不寫預設為 程序退出
import os
pid = os.fork()
if pid < 0:
    print("建立程序失敗")
elif pid == 0:
    print("子程序我的真實PID為:",os.getpid(),"我的父程序PID為:",os.getppid())
else:
    print("我是父程序執行的程式碼,當前的變數pid為:",pid,"我的真實PID為:",os.getpid())
    
'''列印內容''''''
我是父程序執行的程式碼,當前的變數pid為: 10992 我的真實PID為: 10991
子程序我的真實PID為: 10992 我的父程序PID為: 10991
''''''列印內容'''
    
# 如果pid 10992和子程序真實PID不同,那麼這個子程序就變成了孤兒程序

多程序模組

import multiprocessing

from multiprocessing import Process

Process()

  • 功能:建立一個程序物件

  • 引數

    • name:程序名稱
    • target:繫結函式
    • args:元組,給target函式按照位置傳參
    • kwargs:字典,給target函式按照鍵值對傳參
    • name: 字串,新的程序的名字
    • 例如:p = Process(target = fun,args=(a,b))
函式方法說明
p.start() 啟動程序,target函式自動執行,此時程序被真正建立
p.join([timeout]) 阻塞等待回收子程序,timeout為超時時間
p.is_alive() 判斷程序生命週期狀態,處於生命週期,返回布林值
p.name() 獲取程序名稱
p.pid() 獲取程序 的pid
p.daemon() 預設狀態False,主程序退出不影響子程序。True :子程序隨著主程序結束
  • 使用multiprocessing建立子程序,同樣子程序複製父程序的全部程式碼,父子程序各自執行互不影響,父子程序有各自的執行空間

  • 如果不使用join揮手子程序則子程序退出後會成為殭屍程序

from multiprocessing import Process 
from time import sleep 
​
#帶引數的程序函式
def worker(sec,name):
    for i in range(3):
        sleep(sec)
        print("I'm %s"%name)
        print("I'm working...")
​
p = Process(target = worker,args = (2,),\
    kwargs = {'name':'Daivl'},name = "Worker")
p.start()
​
print("Process name:",p.name) #程序名稱
print("Process PID:",p.pid) #獲取程序PID
#程序alive情況
print("Process is alive:",p.is_alive())
​
p.join(3)
print("==================")

建立自定義繼承Process類

  1. 繼承Process

  2. 編寫自己的__init__,同時載入父類的__init__方法

  3. 重寫run方法,可以通過生成的物件呼叫start自動執行

from  import Process
import time
​
class ClockProcess(Process):
​
    def __init__(self,value):
        self.value = value
        super(ClockProcess,self).__init__()
​
    def run(self):
        for i in range(5):
            print("現在的時間是",time.ctime())
            time.sleep(self.value)
​
# 建立自定義進的類的物件
p =ClockProcess(2)
​
# 自動呼叫run
p.start()
p.join()    

程序池技術

  • 產生原因

    • 如果有大量任務需要多程序完成。則需要頻繁的建立刪除程序,給計算機帶來較多的資源消耗
  • 原理

    • 建立適當的程序放入程序池,用來池裡待處理的時間,處理完當前任務後進程不銷燬,仍然哎程序池等待處理其他時間,程序的複用降低了系統資源的消耗
  • 使用方法

    1. 建立程序池,在池內放入適當的程序
    2. 將事件加入到事件等待佇列
    3. 不斷取程序執行時間,直到所有程序執行完畢
    4. 關閉程序池,回收程序

Pool函式

  • Pool(Processes)

    • 功能:建立程序池物件
    • 表示程序池中有多少程序
  • pool.apply_async(func, args, kwds)

    • 功能:將時間放入程序池佇列

    • 引數

      • func:事件函式
      • args:以元組形式給func傳參
      • kwds : 以字典形式給func傳參
    • 返回值一個物件

  • pool.apply(func, args, kwds)

    • 功能:將時間放入程序池佇列

    • 引數

      • func:事件函式
      • args:以元組形式給func傳參
      • kwds : 以字典形式給func傳參
  • pool.close()

    • 功能:關閉程序池
  • pool.join()

    • 功能:回收程序池
  • pool.map(func,iter)

    • 功能:將要做的事件放入程序池

    • 引數

      • func : 要執行的函式
      • iter : 迭代物件
    • 返回值:多次return的結果列

from multiprocessing import Process
import time
​
class ClockProcess(Process):
​
    def __init__(self,value):
        self.value = value
        super(ClockProcess,self).__init__()
​
    def run(self):
        for i in range(5):
            print("現在的時間是",time.ctime())
            time.sleep(self.value)
​
# 建立自定義進的類的物件
p =ClockProcess(2)
​
# 自動呼叫run
p.start()
p.join()    

程序間的通訊(IPC)

程序間通訊的方法有:管道,訊息佇列,共享記憶體,訊號,訊號量,套接字

管道訊息佇列共享記憶體
開闢空間 記憶體 記憶體 記憶體
讀寫方式 兩端讀寫[雙向/單向] 先進先出 覆蓋之前的內容
效率 一般 一般 較高
應用 多用於父子程序 廣泛靈活 需要注意互斥

管道通訊

通訊原理:在記憶體中開闢管道空間,生成管道操作物件,多個程序使用“同一個”管道物件進行操作,即可實現通訊

multiprocessing --> Pipe

  • fd1,fd2 = Pipe(duplex = True)

    • 功能:建立管道

    • 引數

      • 預設表示為雙向管道
      • 設定False則為單向管道
    • 返回值

      • 如果是雙向管道則都可以讀寫
      • 如果是單向管道,則fd1只讀,fd2只寫
  • fd.recv()

    • 功能:從管道讀取資訊
    • 返回值:讀取到的內容
    • 如果管道為空則堵塞
  • fd.send(data)

    • 功能:向管道中寫入內容
    • 引數:要寫入的內容
    • 可以傳送任意Python資料型別

多程序管道傳輸資料示例 

from multiprocessing import Process,Pipe
import time,os
​
# 建立管道物件
fd1, fd2 = Pipe()
​
def fun(name):
    time.sleep(1)
    fd2.send(os.getppid())
​
jobs = []
# 建立5個子程序
for i in range(5):
    p = Process(target = fun,args = (i,))
    jobs.append(p)
    p.start()
​
for i in range(5):
    data = fd1.recv()
    print(data)
​
for i in jobs:
    i.join()

訊息佇列

佇列:先進先出,按照順序來

通訊原理:在記憶體中建立佇列資料結構模型。多個程序都可以通過佇列存入內容,取出內容的順序和存入的順序保持一致

  • 建立佇列

    q = Queue(maxsize = 0)

    • 功能:建立佇列訊息
    • 引數:表示最多儲存多少訊息。預設表示根據記憶體分配儲存
    • 返回值:佇列物件
  • q.put(data, [block, timeout])

    • 功能:像佇列儲存訊息

    • 引數

      • data:要存的內容
      • block:預設佇列滿時候會堵塞,設定False則非堵塞
      • timeout:超時時間
  • data = q.get([block, timeout])

    • 功能:獲取佇列訊息

    • 引數

      • block:預設設定佇列空時會堵塞,設定為False則非堵塞
      • timeout:超時時間
    • 返回值:返回取出的內容

  • q.full():判斷佇列是否為滿

  • q.empty():判斷佇列是否為空

  • q.size():判斷佇列中的訊息數量

  • q.close:關閉佇列

單程序示例

#!《 單程序 》
from multiprocessing import Queue
from time import sleep
​
# 建立佇列,可以放3條訊息
q = Queue(3)
# 存一條訊息
q.put(1)
sleep(0.5)
# 判斷佇列是否為空
print(q.empty())
q.put(2)
# 判斷佇列是否滿
print(q.full())
q.put(3)
# 輸出佇列訊息數量
print(q.qsize())
# 輸出
print(q.get())
q.close()

多程序訊息佇列傳遞資料

from multiprocessing import Queue,Process
from time import sleep
​
# 建立佇列,可以放3條訊息
q = Queue(3)
​
def fun1():
    sleep(1)
    q.put({"a":1,"b":2})
​
def fun2():
    sleep(2)
    print("收到訊息",q.get())
​
p1 = Process(target = fun1)
p2 = Process(target = fun2)
p1.start()
p2.start()
p1.join()
p2.join()

共享記憶體

通訊原理:在記憶體中開闢一段空間,對多個程序可見,程序可以寫入可以讀,但是每次寫入的內容會覆蓋上一次的內容。

只能進行單個數據的傳送

  • obj = Value(ctype, obj)

    • 功能:開闢共享記憶體空間

    • 引數

      • ctype:要儲存的資料型別
      • obj:共享記憶體的初始資料
    • 返回值:共享記憶體物件

    • obj.value即共享記憶體值,對其修改即可修改記憶體

    • from multiprocessing import Value
      from time import sleep
      import os
      # 建立共享記憶體物件
      money = Value('i',2000)
      ​
      # 操作共享記憶體
      def deposite():
          while True:
              i = int(input("請輸入:"))
              money.value = i
              sleep(0.05)
      ​
      def withdraw():
          data = money.value
          while True:
              if data != money.value :
                  data = money.value
                  print(data)
                  
      pid = os.fork()
      if pid == 0 :
          deposite()
      withdraw() 
  • obj = Array(ctype, obj)

    • 功能:開闢共享記憶體空間

    • 引數:

      • ctype:要存入的資料格式
      • obj:初始化存入的內容,比如列表、字串。如果是數字則代表開闢記憶體空間的個數
    • 返回值:返回共享記憶體物件,可以遍歷 

    • from multiprocessing import Array,Process
      from time import sleep
      import os
      ​
      # 開闢100字元記憶體空間,'c'代表字元,'i'代表整形
      shm = Array('c',100)
      # 必須使用位元組流
      shm.value = "哈哈哈".encode()
      def fun1():
          print(os.getpid(),"子程序1:",shm.value.decode())
          shm.value = "夜夜夜".encode()
      ​
      def fun2():
          sleep(1)
          print(os.getpid(), "子程序2:",shm.value.decode())
      ​
      p1 = Process(target = fun1)  
      p2 = Process(target = fun2)
      p1.start()
      p2.start()
      p1.join()
      p2.join()
      ​
      ​

訊號通訊

一個程序向另一個程序傳送一個訊號來傳遞某種資訊,接受者根據傳遞的資訊來做相應的事

$ kill -l檢視系統訊號說明

$ kill -9 pid號對程序傳送訊號

訊號名稱說明
1) SIGHUP 連線斷開
2) SIGINT ctrl+c
3) SIGQUIT ctrl+\
20) SIGTSTP ctrl+z
9) SIGKILL 終止程序
19) SIGSTOP 暫停程序
26) SIGVTALRM 時鐘訊號
17) SIGCHLD 子程序退出時給父程序發的訊號

在Python中import signal可以獲取訊號

  • os.kill(pid, sig)

    • 功能:傳送訊號

    • 引數

      • pid:要傳送訊號的PID號
      • sig :訊號名稱 
import os
import signal
os.kill(12345,signal.SIGKILL) #殺死程序