1. 程式人生 > >學習筆記-python多程序

學習筆記-python多程序

e多程序

程序的實質就是就是正在執行的程式

底層的cpu級別中根本就不會出現多個程式一起執行的情況,多核cpu能夠同時執行對應核數的程式,cpu使用的是時間片段的方式進行程式執行,由於切換的非常快,看起來就像是同時執行多個程式

為了看清實際的動作,使用的fork()進行程式設計,fork()只能在linux中執行,windows執行會直接報錯

程式碼:

import os

def test1():
    for i in range(3):
        print('程序1')

def test2():
    for i in range(5):
        print
('程序2') pid = os.fork() # 返回當前程序的子程序id,由於這裡是分裂程序,子程序實際上在執行這一句的時候是沒有pid值的,因此係統給的值是0 pids = os.fork() if pid == 0: # 表示子程序,pid的值實際上是當前程序的子程序id,如果當前就是子程序則id=0 test1() else: test2() print('執行結束')

程式在執行到os.fork()的時候進行自動的分裂,主程序分裂一個子程序,這裡存在兩個fork()

在第一個pid=os.fork()的時候存在一個主程序一個子程序,在主程序和子程序執行pids=os.fork()的時候主程序再次分裂一個子程序,二原來的子程序執行繼續分裂為兩個子程序.所以此時整個程式存在四個程序

程序id的查詢

os.getpid() # 查詢當前的程序的程序id
os.getppid() # 查詢當前程序的父程序id

當一個程序進行分裂的時候,主程序還是原來的程序,分裂的子程序的父程序id能夠在子程序查到,但是子程序沒有繼續分裂則子程序的子程序id為0

程序的記憶體複製性質

當程序進行分裂的一瞬間,主程序實際上存在自己的記憶體中間中會有很多變數,而子程序是新建立的,實際上子程序會在分裂的時候複製主程序中的記憶體去開闢一個新記憶體,那麼這兩個記憶體實際上相互獨立,因此在分裂之前定義的變數實際上是雙份,子程序中的各自對變數進行操作,其結果將互不影響

import
os # 關於全域性變數的操作 num = 0 pid = os.fork() # 建立多程序 if pid == 0: num += 1 print(f'{num}') else: num += 2 print(f'{num}') # 因為多程序的狀態下實際上在執行程序分裂的時候子程序就會複製父程序的記憶體空間 輸出結果1,2

能夠跨平臺使用的多程序模組

使用的是multiprocessing中的Process

這是一個類,想使用這個類先例項化一個物件,類上面存在多個方法,當例項化物件的時候類本身是沒有立即執行程序分裂,而在執行start方法的時候才分裂,例項化物件的時候只是進行物件引數的傳入和初始化

程式碼入下

import time
from multiprocessing import Process  # 匯入多程序模組

import os


def test1(args):
    time.sleep(1)
    print(f'{os.getpid()}子程序1程序號,傳入的引數{type(args)}')
    print(f'父程序id{os.getppid()}')
def test2(args1,args2):
    time.sleep(2)
    print(f"{os.getpid()}子程序2程序號,傳入的引數{args1,args2}")
    print(f'父程序id{os.getppid()}')

def main():
    # 建立程序物件
    p1 = Process(target=test1,args=(1,)) # 第一個程序
    p1.start() # 啟動程序
    p2 = Process(target=test2,args=(10,12)) # 第二個程序
    p2.start()
    # 主程序執行
    print(f'主程序main{os.getpid()}')

if __name__ == '__main__':
    main()

其中引數的傳入有點坑,本身Process上面有args和kwargs兩個引數,前一個是傳一般的引數,後一個傳的是關鍵字引數,一般的位置引數通常是一個可迭代的物件,在例項化的時候要麼是元組要麼是列表,而kwargs是關鍵字引數,直接傳入的是字典

target表示分裂的程序需要執行對應的函式的函式名

在函式的接收的時候,如果傳入的args不止一個元素,那麼函式就要接收對應個數的元素,不用去計較引數名,但是一旦是kwargs引數則表明是關鍵字引數,引數名必須一致

from multiprocessing import Process

def test(x,y,key1,key2):
    print(f'args引數{x}和{y},kwags引數為:{key1,key2}')

def main():
    p1 = Process(target=test,args=[1,2],kwargs={'key1':10,'key2':20})
    # 啟動p1分裂程序
    p1.start()
    print('結束')

if __name__ == '__main__':
    main()

執行結果:

結束
args引數12,kwags引數為:(10, 20)

使用類的多程序寫法

import os
import time
from multiprocessing import Process
# 進行類的寫法
class MyProcess(Process):
    def __init__(self,args,**kwargs):
        # 可以有兩種方式進行父類的初始化
        super(MyProcess, self).__init__()
        # 或者使用的是
        # Process.__init__()
        self.args = args
        self.kwargs = kwargs

    def run(self): # 這裡必須寫run,固定格式,在物件start的時候就自動執行這裡的run
        time.sleep(2)
        print(f'在類裡面進行程序執行{os.getpid()},引數為{self.args},關鍵字引數為:{self.kwargs}')


if __name__ == '__main__':
    # 例項化一個物件
    my_dict = {'key1':0,'key2':1}
    p1 = MyProcess(111,**my_dict)
    p1.start()
    # p1.join()
    print('主程序結束')

類的程序寫法中需要注意的是類方法的問題,必須寫run,這種是固定的寫法,在繼承父類的情況寫寫init的時候需要將父類的初始化方法執行,否則會報錯

程序池

程序池說白了就是自行設定的容器,如果有三個容器,需要執行的程序有10個,則每個容器只能執行一個程序,多餘7個在外面排隊等著,三個正在執行的程序誰先結束,則會空一個容器出來,那麼其餘的7個程序只能有一個進到容器中進行執行,就像排隊一樣,這就是程序池的作用,你可以自己規定容器的個數

from multiprocessing import Pool # 匯入程序池
import os
import random
import time
def show(i):
    print(f'這裡是程序:{os.getpid()},迴圈次數{i}')
    # 隨機睡眠時間
    time.sleep(random.random()*2)

if __name__ == '__main__':
    pool = Pool(3) # 建立三個容器
    # 進行迴圈開啟程序
    for i in range(10):
        pool.apply_async(show,(i,)) # 開啟非同步執行
    # 開啟程序池之後進行關閉程序池
    pool.close()
    # 執行等待所有的程序執行完成
    pool.join()
    # 最後執行主執行緒
    print('主執行緒結束')

多程序之間的通訊

程序與程序之間實際上是兩個獨立的記憶體空間,那麼變數之間是沒喲辦法進行傳遞的,如果需要進行傳遞,則需要進行公共的記憶體來儲存兩個程序之間的變數資訊,這個公共儲存變數的記憶體空間實際上是利用佇列的儲存方式進行儲存,先進的先出儲存方式.因此如果需要進行變數的傳遞,程序不能直接傳到另一個程序中,而是先將需要的資料傳遞到中間的公共記憶體部分,也就是佇列裡面,另外一個程序則從公共的部分裡面進行獲取資料,從而實現程序之間的通訊

採用的是Queue的方式進行傳遞,Queue就像一個公共儲存的地方,兩個程序可以從中程序資料的拿取

import time
from multiprocessing import Process,Queue
# 程序之間的通訊使用的是佇列的方式
def put_function(q):
    data = ['string','list','dict',{'key':'int'}]
    for i in data:
        q.put(i)
    print('將data存進中間的佇列')

def get_function(q):
    time.sleep(2) # 保證put裡面有資料,該程序先執行則將直接結束,所以進行延遲
    while True:
        data = q.get()
        if data: # 判定佇列中是否有值,如果有則輸出
            print(data)
        else:
            break

if __name__ == '__main__':
    # 建立佇列
    q = Queue() # 通常並不限制佇列中的資料個數
    p1 = Process(target=put_function, args=(q,))
    p2 = Process(target=get_function, args=(q,))
    # 以上是建立了兩個程序但是還沒有執行
    # 執行p1程序
    p1.start()
    p2.start()

這裡比較難受的是當如果不進行設定時間延遲,則可能導致p2程序先向佇列中進行資料拿取,而此時p1還沒有將資料put進佇列,從而直接到時p2程序進入break結束掉,這裡進行延遲就是為了保證p2在拿資料的時候存在資料,當然為了更安全起見還可以在p2.start()之前進行p1.join()操作,在p1沒有結束之前不執行後面的程式碼