1. 程式人生 > >多執行緒與多程序及Python實現【Python實現多程序】

多執行緒與多程序及Python實現【Python實現多程序】

上一篇部落格介紹了多執行緒與多程序的理論部分,這篇部落格將參考部落格以及各種教程完成Python多程序實現部分。

multiprocessing模組

Process 類

multiprocessing.Process(group=None, target=None, name=None, 
args=(), kwargs={}, *, daemon=None)
  • star() 方法啟動程序,
  • join() 方法實現程序間的同步,等待所有程序退出
  • close() 用來阻止多餘的程序湧入程序池 Pool 造成程序阻塞。
  • target 是函式名字,需要呼叫的函式
  • args 函式需要的引數,以 tuple
    的形式傳入
import multiprocessing
import os

def run_proc(name):
    print('Child process {0} {1} Running '.format(name, os.getpid()))

if __name__ == '__main__':
    print('Parent process {0} is Running'.format(os.getpid()))
    for i in range(5):
        p = multiprocessing.Process(target=run_proc, args=
(str(i),)) print('process start') p.start() p.join() print('Process close')
Parent process 53067 is Running
process start
Child process 0 55047 Running 
process start
process start
Child process 1 55048 Running 
process start
Child process 2 55049 Running 
process start
Child process 3
55050 Running Child process 4 55051 Running Process close

Process類詳解

Process 類用來描述一個程序物件。建立子程序的時候,只需要傳入一個執行函式和函式的引數即可完成 Process 示例的建立。 Process物件的初始化引數為Process(group=None, target=None, name=None, args=(), kwargs={}),其中group引數必須為None(為了與threading.Thread的相容),target指向可呼叫物件(該物件在新的子程序中執行),name是為該子程序命的名字(預設是Proess-1,Process-2, …這樣),args是被呼叫物件的位置引數的元組列表,kwargs是被呼叫物件的關鍵字引數。

子程序終結時會通知父程序並清空自己所佔據的記憶體,在核心裡留下退出資訊(exit code,如果順利執行,為0;如果有錯誤或異常狀況,為大於零的整數)。父程序得知子程序終結後,需要對子程序使用wait系統呼叫,wait函式會從核心中取出子程序的退出資訊,並清空該資訊在核心中佔據的空間。 如果父程序早於子程序終結,子程序變成孤兒程序,孤兒程序會被過繼給init程序,init程序就成了該子程序的父程序,由init程序負責該子程序終結時呼叫wait函式。如果父程序不對子程序呼叫wait函式,子程序成為殭屍程序。殭屍程序積累時,會消耗大量記憶體空間。

Process類join方法

  • 如果在父程序中不呼叫p.join方法,則主程序與父程序並行工作:
from multiprocessing import Process
import time

def func():
    print("Child process start, %s" % time.ctime())
    time.sleep(2)
    print("Child process end, %s" % time.ctime())


if __name__ == "__main__":
    print("Parent process start, %s" % time.ctime())
    p = Process(target=func)
    p.start()
    # p.join()
    time.sleep(1)
    print("Parent process end, %s" % time.ctime())

結果

Parent process start, Sun Oct  7 18:41:18 2018
Child process start, Sun Oct  7 18:41:18 2018
Parent process end, Sun Oct  7 18:41:19 2018
Child process end, Sun Oct  7 18:41:20 2018

如果開啟了p.join的呼叫,結果為

Parent process start, Sun Oct  7 18:41:37 2018
Child process start, Sun Oct  7 18:41:37 2018
Child process end, Sun Oct  7 18:41:39 2018
Parent process end, Sun Oct  7 18:41:40 2018

Process類守護程序

  • 另外一種情況是將子程序設定為守護程序,則父程序在退出時不會關注子程序是否結束而直接退出:
from multiprocessing import Process
import time

def func():
    print("Child process start, %s" % time.ctime())
    time.sleep(2)
    print("Child process end, %s" % time.ctime())


if __name__ == "__main__":
    print("Parent process start, %s" % time.ctime())
    p = Process(target=func)
    # 守護程序一定要在start方法呼叫之前設定
    p.daemon = True
    p.start()
    # p.join()
    time.sleep(1)
    print("Parent process end, %s" % time.ctime())

結果

Parent process start, Sun Oct  7 18:45:58 2018
Child process start, Sun Oct  7 18:45:58 2018
Parent process end, Sun Oct  7 18:45:59 2018
Child process end, Sun Oct  7 18:46:00 2018

如果開啟主程序對join方法的呼叫,主程序還是會等待守護子程序結束

Parent process start, Sun Oct  7 18:46:45 2018
Child process start, Sun Oct  7 18:46:45 2018
Child process end, Sun Oct  7 18:46:47 2018
Parent process end, Sun Oct  7 18:46:48 2018

Pool 類

Pool 可以提供指定數量的程序供使用者使用,預設是 CPU 核數。當有新的請求提交到 Pool 的時候,如果池子沒有滿,會建立一個程序來執行,否則就會讓該請求等待

  • Pool 物件呼叫 join 方法會等待所有的子程序執行完畢
  • 呼叫 join 方法之前,必須呼叫 close
  • 呼叫 close 之後就不能繼續新增新的 Process 了

pool.apply_async

apply_async 方法用來同步執行程序,允許多個程序同時進入池子。

import multiprocessing
import os
import time

def run_task(name):
    print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
    time.sleep(1)
    print('Task {0} end.'.format(name))

if __name__ == '__main__':
    print('current process {0}'.format(os.getpid()))
    p = multiprocessing.Pool(processes=3)
    for i in range(6):
        p.apply_async(run_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All processes done!')
current process 921
Waiting for all subprocesses done...
Task 0 pid 922 is running, parent id is 921
Task 1 pid 923 is running, parent id is 921
Task 2 pid 924 is running, parent id is 921
Task 0 end.
Task 3 pid 922 is running, parent id is 921
Task 1 end.
Task 4 pid 923 is running, parent id is 921
Task 2 end.
Task 5 pid 924 is running, parent id is 921
Task 3 end.
Task 4 end.
Task 5 end.
All processes done!

pool.apply

該方法只能允許一個程序進入池子,在一個程序結束之後,另外一個程序才可以進入池子。

import multiprocessing
import os
import time

def run_task(name):
    print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
    time.sleep(1)
    print('Task {0} end.'.format(name))

if __name__ == '__main__':
    print('current process {0}'.format(os.getpid()))
    p = multiprocessing.Pool(processes=3)
    for i in range(6):
        p.apply(run_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All processes done!')
Task 0 pid 928 is running, parent id is 927
Task 0 end.
Task 1 pid 929 is running, parent id is 927
Task 1 end.
Task 2 pid 930 is running, parent id is 927
Task 2 end.
Task 3 pid 928 is running, parent id is 927
Task 3 end.
Task 4 pid 929 is running, parent id is 927
Task 4 end.
Task 5 pid 930 is running, parent id is 927
Task 5 end.
Waiting for all subprocesses done...
All processes done!

Queue 程序間通訊

Queue 用來在多個程序間通訊(佇列,先進先出)。Queue 有兩個方法,get 和 put。

put 方法

put 方法用來插入資料到佇列中,引數為blockedtimeout

  • blocked = True(預設值),timeout 為正
該方法會阻塞 timeout 指定的時間,直到該佇列有剩餘空間。
如果超時,丟擲 Queue.Full 異常。
  • blocked = False
如果 Queue 已滿,立刻丟擲 Queue.Full 異常

get 方法

get 方法用來從佇列中讀取並刪除一個元素。引數為blockedtimeout

  • blocked = True(預設值),timeout 為正
等待時間內,沒有取到任何元素,會丟擲 Queue.Empty 異常。
  • blocked = True
Queue 有一個值可用,立刻返回改值;Queue 沒有任何元素
from multiprocessing import Process, Queue
import os, time, random
# 寫資料程序執行的程式碼:
def proc_write(q,urls):
    print('Process(%s) is writing...' % os.getpid())
    for url in urls:
        q.put(url)
        print('Put %s to queue...' % url)
        time.sleep(random.random())
# 讀資料程序執行的程式碼:
def proc_read(q):
    print('Process(%s) is reading...' % os.getpid())
    while True:
        url = q.get(True)
        print('Get %s from queue.' % url)
if __name__=='__main__':
    # 父程序建立Queue,並傳給各個子程序:
    q = Queue()
    proc_writer1 = Process(target=proc_write, args=(q,['url_1', 'url_2', 'url_3']))
    proc_writer2 = Process(target=proc_write, args=(q,['url_4','url_5','url_6']))
    proc_reader = Process(target=proc_read, args=(q,))
    # 啟動子程序proc_writer,寫入:
    proc_writer1.start()
    proc_writer2.start()
    # 啟動子程序proc_reader,讀取:
    proc_reader.start()
    # 等待proc_writer結束:
    proc_writer1.join()
    proc_writer2.join()
    # proc_reader程序裡是死迴圈,無法等待其結束,只能強行終止:
    proc_reader.terminate()
Process(1083) is writing...
Put url_1 to queue...
Process(1084) is writing...
Put url_4 to queue...
Process(1085) is reading...
Get url_1 from queue.
Get url_4 from queue.
Put url_5 to queue...
Get url_5 from queue.
Put url_2 to queue...
Get url_2 from queue.
Put url_6 to queue...
Get url_6 from queue.
Put url_3 to queue...
Get url_3 from queue.

Pipe 程序間通訊

常用來在兩個程序間通訊,兩個程序分別位於管道的兩端。 示例一

from multiprocessing import Process, Pipe

def send(pipe):
    pipe.send(['spam'] + [42, 'egg'])   # send 傳輸一個列表
    pipe.close()

if __name__ == '__main__':
    (con1, con2) = Pipe()                            # 建立兩個 Pipe 例項
    sender = Process(target=send, args=(con1, ))     # 函式的引數,args 一定是例項化之後的 Pip 變數,不能直接寫 args=(Pip(),)
    sender.start()                                   # Process 類啟動程序
    print("con2 got: %s" % con2.recv())              # 管道的另一端 con2 從send收到訊息
    con2.close()                                     # 關閉管道

結果

con2 got: ['spam', 42, 'egg']

示例二

from multiprocessing import Process, Pipe

def talk(pipe):
    pipe.send(dict(name='Bob', spam=42))            # 傳輸一個字典
    reply = pipe.recv()                             # 接收傳輸的資料
    print('talker got:', reply)

if __name__ == '__main__':
    (parentEnd, childEnd) = Pipe()                  # 建立兩個 Pipe() 例項,也可以改成 conf1, conf2
    child = Process(target=talk, args=(childEnd,))  # 建立一個 Process 程序,名稱為 child
    child.start()                                   # 啟動程序
    print('parent got:', parentEnd.recv())          # parentEnd 是一個 Pip() 管道,可以接收 child Process 程序傳輸的資料
    parentEnd.send({x * 2 for x in 'spam'})         # parentEnd 是一個 Pip() 管道,可以使用 send 方法來傳輸資料
    child.join()                                    # 傳輸的資料被 talk 函式內的 pip 管道接收,並賦值給 reply
    print('parent exit')

結果

parent got: {'name': 'Bob', 'spam': 42}
talker got: {'ss', 'aa', 'pp', 'mm'}
parent exit

共享記憶體

在程序間共享狀態可以使用multiprocessing.Valuemultiprocessing.Array這樣特殊的共享記憶體物件:

from multiprocessing import Process, Value, Array

def func(n, a):
    n.value = 3.1415926
    for i in range(len(a)):
        a[i] = -i

if __name__ == "__main__":
    # 'd'表示浮點型資料,'i'表示整數
    n = Value('d', 0.0)
    a = Array('i', range(10))
    p = Process(target=func, args=(n, a,))
    p.start()
    p.join()

    print(n.value)
    print(a[:])

結果

3.1415926
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

參考