1. 程式人生 > >python高階(二)——多工(二)程序(1)

python高階(二)——多工(二)程序(1)

程序以及狀態

1. 程序

程式:例如xxx.py這是程式,是一個靜態的

程序:一個程式執行起來後,程式碼+用到的資源 稱之為程序,它是作業系統分配資源的基本單元。

不僅可以通過執行緒完成多工,程序也是可以的

2. 程序的狀態

工作中,任務數往往大於cpu的核數,即一定有一些任務正在執行,而另外一些任務在等待cpu進行執行,因此導致了有了不同的狀態

 

  • 就緒態:執行的條件都已經慢去,正在等在cpu執行
  • 執行態:cpu正在執行其功能
  • 等待態:等待某些條件滿足,例如一個程式sleep了,此時就處於等待態

程序的建立-multiprocessing

multiprocessing模組就是跨平臺版本的多程序模組,提供了一個Process類來代表一個程序物件,這個物件可以理解為是一個獨立的程序,可以執行另外的事情

1. 2個while迴圈一起執行

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time


def run_proc():
    """子程序要執行的程式碼"""
    while True:
        print("----2----")
        time.sleep(1)


if __name__=='__main__':
    p = Process(target=run_proc)
    p.start()
    while True:
        print("----1----")
        time.sleep(1)

說明

  • 建立子程序時,只需要傳入一個執行函式和函式的引數,建立一個Process例項,用start()方法啟動

2. 程序pid

# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time

def run_proc():
    """子程序要執行的程式碼"""
    print('子程序執行中,pid=%d...' % os.getpid())  # os.getpid獲取當前程序的程序號
    print('子程序將要結束...')

if __name__ == '__main__':
    print('父程序pid: %d' % os.getpid())  # os.getpid獲取當前程序的程序號
    p = Process(target=run_proc)
    p.start()

3. Process語法結構如下:

Process([group [, target [, name [, args [, kwargs]]]]])

  • target:如果傳遞了函式的引用,可以任務這個子程序就執行這裡的程式碼
  • args:給target指定的函式傳遞的引數,以元組的方式傳遞
  • kwargs:給target指定的函式傳遞命名引數
  • name:給程序設定一個名字,可以不設定
  • group:指定程序組,大多數情況下用不到

Process建立的例項物件的常用方法:

  • start():啟動子程序例項(建立子程序)
  • is_alive():判斷程序子程序是否還在活著
  • join([timeout]):是否等待子程序執行結束,或等待多少秒
  • terminate():不管任務是否完成,立即終止子程序

Process建立的例項物件的常用屬性:

  • name:當前程序的別名,預設為Process-N,N為從1開始遞增的整數
  • pid:當前程序的pid(程序號)

4. 給子程序指定的函式傳遞引數

# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
from time import sleep


def run_proc(name, age, **kwargs):
    for i in range(10):
        print('子程序執行中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid()))
        print(kwargs)
        sleep(0.2)

if __name__=='__main__':
    p = Process(target=run_proc, args=('test',18), kwargs={"m":20})
    p.start()
    sleep(1)  # 1秒中之後,立即結束子程序
    p.terminate()
    p.join()

執行結果:

子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序執行中,name= test,age=18 ,pid=45097...
{'m': 20}

5. 程序間不同享全域性變數

# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time

nums = [11, 22]

def work1():
    """子程序要執行的程式碼"""
    print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
    for i in range(3):
        nums.append(i)
        time.sleep(1)
        print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))

def work2():
    """子程序要執行的程式碼"""
    print("in process2 pid=%d ,nums=%s" % (os.getpid(), nums))

if __name__ == '__main__':
    p1 = Process(target=work1)
    p1.start()
    p1.join()

    p2 = Process(target=work2)
    p2.start()

執行結果:

in process1 pid=11349 ,nums=[11, 22]
in process1 pid=11349 ,nums=[11, 22, 0]
in process1 pid=11349 ,nums=[11, 22, 0, 1]
in process1 pid=11349 ,nums=[11, 22, 0, 1, 2]
in process2 pid=11350 ,nums=[11, 22]

程序、執行緒對比

功能

  • 程序,能夠完成多工,比如 在一臺電腦上能夠同時執行多個QQ
  • 執行緒,能夠完成多工,比如 一個QQ中的多個聊天視窗

定義的不同

  • 程序是系統進行資源分配和排程的一個獨立單位.

  • 執行緒是程序的一個實體,是CPU排程和分派的基本單位,它是比程序更小的能獨立執行的基本單位.執行緒自己基本上不擁有系統資源,只擁有一點在執行中必不可少的資源(如程式計數器,一組暫存器和棧),但是它可與同屬一個程序的其他的執行緒共享程序所擁有的全部資源.

區別

  • 一個程式至少有一個程序,一個程序至少有一個執行緒.
  • 執行緒的劃分尺度小於程序(資源比程序少),使得多執行緒程式的併發性高。
  • 程序在執行過程中擁有獨立的記憶體單元,而多個執行緒共享記憶體,從而極大地提高了程式的執行效率
  • 線執行緒不能夠獨立執行,必須依存在程序中
  • 可以將程序理解為工廠中的一條流水線,而其中的執行緒就是這個流水線上的工人

優缺點

執行緒和程序在使用上各有優缺點:執行緒執行開銷小,但不利於資源的管理和保護;而程序正相反。

程序間通訊-Queue

Process之間有時需要通訊,作業系統提供了很多機制來實現程序間的通訊。

1. Queue的使用

可以使用multiprocessing模組的Queue實現多程序之間的資料傳遞,Queue本身是一個訊息列隊程式,首先用一個小例項來演示一下Queue的工作原理:


#coding=utf-8
from multiprocessing import Queue
q=Queue(3) #初始化一個Queue物件,最多可接收三條put訊息
q.put("訊息1") 
q.put("訊息2")
print(q.full())  #False
q.put("訊息3")
print(q.full()) #True

#因為訊息列隊已滿下面的try都會丟擲異常,第一個try會等待2秒後再丟擲異常,第二個Try會立刻丟擲異常
try:
    q.put("訊息4",True,2)
except:
    print("訊息列隊已滿,現有訊息數量:%s"%q.qsize())

try:
    q.put_nowait("訊息4")
except:
    print("訊息列隊已滿,現有訊息數量:%s"%q.qsize())

#推薦的方式,先判斷訊息列隊是否已滿,再寫入
if not q.full():
    q.put_nowait("訊息4")

#讀取訊息時,先判斷訊息列隊是否為空,再讀取
if not q.empty():
    for i in range(q.qsize()):
        print(q.get_nowait())

執行結果:


False
True
訊息列隊已滿,現有訊息數量:3
訊息列隊已滿,現有訊息數量:3
訊息1
訊息2
訊息3

說明

初始化Queue()物件時(例如:q=Queue()),若括號中沒有指定最大可接收的訊息數量,或數量為負值,那麼就代表可接受的訊息數量沒有上限(直到記憶體的盡頭);

  • Queue.qsize():返回當前佇列包含的訊息數量;

  • Queue.empty():如果佇列為空,返回True,反之False ;

  • Queue.full():如果佇列滿了,返回True,反之False;

  • Queue.get([block[, timeout]]):獲取佇列中的一條訊息,然後將其從列隊中移除,block預設值為True;

1)如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果為空,此時程式將被阻塞(停在讀取狀態),直到從訊息列隊讀到訊息為止,如果設定了timeout,則會等待timeout秒,若還沒讀取到任何訊息,則丟擲"Queue.Empty"異常;

2)如果block值為False,訊息列隊如果為空,則會立刻丟擲"Queue.Empty"異常;

  • Queue.get_nowait():相當Queue.get(False);

  • Queue.put(item,[block[, timeout]]):將item訊息寫入佇列,block預設值為True;

1)如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果已經沒有空間可寫入,此時程式將被阻塞(停在寫入狀態),直到從訊息列隊騰出空間為止,如果設定了timeout,則會等待timeout秒,若還沒空間,則丟擲"Queue.Full"異常;

2)如果block值為False,訊息列隊如果沒有空間可寫入,則會立刻丟擲"Queue.Full"異常;

  • Queue.put_nowait(item):相當Queue.put(item, False);

2. Queue例項

我們以Queue為例,在父程序中建立兩個子程序,一個往Queue裡寫資料,一個從Queue裡讀資料:

from multiprocessing import Process, Queue
import os, time, random

# 寫資料程序執行的程式碼:
def write(q):
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 讀資料程序執行的程式碼:
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print('Get %s from queue.' % value)
            time.sleep(random.random())
        else:
            break

if __name__=='__main__':
    # 父程序建立Queue,並傳給各個子程序:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子程序pw,寫入:
    pw.start()    
    # 等待pw結束:
    pw.join()
    # 啟動子程序pr,讀取:
    pr.start()
    pr.join()
    # pr程序裡是死迴圈,無法等待其結束,只能強行終止:
    print('')
    print('所有資料都寫入並且讀完')

 

程序池Pool

當需要建立的子程序數量不多時,可以直接利用multiprocessing中的Process動態成生多個程序,但如果是上百甚至上千個目標,手動的去建立程序的工作量巨大,此時就可以用到multiprocessing模組提供的Pool方法。

初始化Pool時,可以指定一個最大程序數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到指定的最大值,那麼該請求就會等待,直到池中有程序結束,才會用之前的程序來執行新的任務,請看下面的例項:

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os, time, random

def worker(msg):
    t_start = time.time()
    print("%s開始執行,程序號為%d" % (msg,os.getpid()))
    # random.random()隨機生成0~1之間的浮點數
    time.sleep(random.random()*2) 
    t_stop = time.time()
    print(msg,"執行完畢,耗時%0.2f" % (t_stop-t_start))

po = Pool(3)  # 定義一個程序池,最大程序數3
for i in range(0,10):
    # Pool().apply_async(要呼叫的目標,(傳遞給目標的引數元祖,))
    # 每次迴圈將會用空閒出來的子程序去呼叫目標
    po.apply_async(worker,(i,))

print("----start----")
po.close()  # 關閉程序池,關閉後po不再接收新的請求
po.join()  # 等待po中所有子程序執行完成,必須放在close語句之後
print("-----end-----")

執行結果:

----start----
0開始執行,程序號為21466
1開始執行,程序號為21468
2開始執行,程序號為21467
0 執行完畢,耗時1.01
3開始執行,程序號為21466
2 執行完畢,耗時1.24
4開始執行,程序號為21467
3 執行完畢,耗時0.56
5開始執行,程序號為21466
1 執行完畢,耗時1.68
6開始執行,程序號為21468
4 執行完畢,耗時0.67
7開始執行,程序號為21467
5 執行完畢,耗時0.83
8開始執行,程序號為21466
6 執行完畢,耗時0.75
9開始執行,程序號為21468
7 執行完畢,耗時1.03
8 執行完畢,耗時1.05
9 執行完畢,耗時1.69
-----end-----

multiprocessing.Pool常用函式解析:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式呼叫func(並行執行,堵塞方式必須等待上一個程序退出才能執行下一個程序),args為傳遞給func的引數列表,kwds為傳遞給func的關鍵字引數列表;
  • close():關閉Pool,使其不再接受新的任務;
  • terminate():不管任務是否完成,立即終止;
  • join():主程序阻塞,等待子程序的退出, 必須在close或terminate之後使用;

程序池中的Queue

如果要使用Pool建立程序,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會得到一條如下的錯誤資訊:

RuntimeError: Queue objects should only be shared between processes through inheritance.

下面的例項演示了程序池中的程序如何通訊:


# -*- coding:utf-8 -*-

# 修改import中的Queue為Manager
from multiprocessing import Manager,Pool
import os,time,random

def reader(q):
    print("reader啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("reader從Queue獲取到訊息:%s" % q.get(True))

def writer(q):
    print("writer啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
    for i in "itcast":
        q.put(i)

if __name__=="__main__":
    print("(%s) start" % os.getpid())
    q = Manager().Queue()  # 使用Manager中的Queue
    po = Pool()
    po.apply_async(writer, (q,))

    time.sleep(1)  # 先讓上面的任務向Queue存入資料,然後再讓下面的任務開始從中取資料

    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print("(%s) End" % os.getpid())

執行結果:

(11095) start
writer啟動(11097),父程序為(11095)
reader啟動(11098),父程序為(11095)
reader從Queue獲取到訊息:i
reader從Queue獲取到訊息:t
reader從Queue獲取到訊息:c
reader從Queue獲取到訊息:a
reader從Queue獲取到訊息:s
reader從Queue獲取到訊息:t
(11095) End