1. 程式人生 > >31 Python 多程序-multiprocessing

31 Python 多程序-multiprocessing

Python 多程序程式設計 - multiprocessing模組

程序

程序的概念

​ 程序是作業系統中最基本的概念。在多道程式系統出現後,為了刻畫系統內部出現的動態情況,描述系統內部各道程式的活動規律引進的一個概念,所有多道程式設計作業系統都建立在程序的基礎上。

狹義定義:程式的一個執行(執行)例項;

廣義定義:程序是一個具有一定獨立功能的程式,關於某個資料集合的一次執行活動,是系統進行資源分配和排程(執行)的基本單位,是作業系統結構的基礎。在早期面向程序設計的計算機結構中,程序是程式的基本執行實體;在當代面向執行緒設計的計算機結構中,程序是執行緒的容器。

程序的概念主要有兩點:

  1. 程序是一個實體。每一個程序都有它自己的地址空間。

    一般情況下,程序包括文字區域(text region)、資料區域(data region)和堆疊(stack region)。

    文字區域,包含程式的源指令;

    資料區域,包含了靜態變數;

    堆,動態記憶體分割槽區域;

    棧,動態增長與收縮的段,儲存本地變數;

  2. 程序是一個“執行中的程式”。

    程式是一個沒有生命的實體,只有處理器賦予程式生命時(執行程式),它才能成為一個活動的實體,我們稱其為程序。程式是指令、資料及其組織形式的描述,程序是程式的實體。

程序的基本狀態

  • 就緒狀態,分配了除CPU以外所有的資源,只要獲得cpu即可執行
  • 執行狀態
  • 阻塞狀態,正在執行的程序由於一些事件無法繼續執行,便放棄CPU處於暫停狀態。使程序的執行收到阻塞(如訪問臨界區)
  • 掛起狀態,如發現程式有問題,希望暫時停下來,即暫停執行

同步機制遵循的原則:空閒讓進 忙則等待 有限等待 讓權等待

程序的通訊方式

  • 管道(Pipe)

    管道可用於具有親緣關係程序間的通訊,允許一個程序和另一個與它有共同祖先的程序之間進行通訊

  • 命名管道(namedpipe)

    命名管道克服了管道沒有名字的限制,因此,除了擁有管道的功能外,它還可用於無親緣關係程序間的通訊。命名管道在檔案系統中有對應的檔名

  • 訊號(Signal)

    訊號是比較複雜的通訊方式,用於通知接受程序有某種事件發生,除了用於程序間通訊外,程序還可以傳送訊號給程序本身

  • 訊息佇列

    訊息佇列是訊息的連結表,包括Posix訊息佇列system V訊息佇列。有足夠許可權的程序可以向佇列中新增訊息,被賦予讀許可權的程序則可以讀走佇列中的訊息。

  • 共享記憶體

    使得多個程序可以訪問同一塊記憶體空間,是最快的可用IPC形式。是針對其他通訊機制執行效率較低而設計的。往往與其它通訊機制,如訊號量結合使用,來達到程序間的同步及互斥

  • 記憶體對映(mappedmemory)

    記憶體對映允許任何多個程序間通訊,每一個使用該機制的程序通過把一個共享的檔案對映到自己的程序地址空間

  • 訊號量(semaphore)

    主要作為程序間以及同一程序不同執行緒之間的同步手段

  • 套介面(Socket)

    常見的程序間通訊機制,可用於不同機器之間的程序間通訊

多程序

​ 程序可以建立子程序,子程序是完全獨立執行的實體,每個子程序都擁有自己的私有系統狀態和執行主執行緒。因為子程序是獨立的,所以它可以與父程序併發執行。也就是說,父程序可以處理事件1,同時,子程序可以在後臺處理事件2。

​ 使用多個程序或執行緒時,作業系統負責安排它們的工作。具體做法是:給每個程序(執行緒)安排一個小的時間片,並在所有的任務之間快速輪詢,給每個任務分配一部分可用的CPU時間。如系統同時執行10個程序,作業系統會給每個程序分配1/10的CPU時間,在10個程序之間快速輪詢。在具有多個CPU的系統上,作業系統可以儘可能使用每個CPU,從而併發執行程序。

請記住,在任何一個給定的時刻,程式(程序)都只做一件事情。

Python與併發程式設計

Python支援執行緒,但是Python的執行緒受到很多限制,因為Python直譯器使用了內部的全域性解釋鎖(GIL),Python的執行由Python虛擬機器控制,Python直譯器可以執行多個執行緒,但是任意時刻只允許單個執行緒在直譯器中執行,對Python虛擬機器的訪問由全域性解釋鎖(GIL)控制。

GIL保證同一個時刻僅有一個執行緒在直譯器中執行。無論系統上有多少個CPU,Python只能在一個CPU上執行。

(使用GIL的原因,在多執行緒訪問資料時,保證資料安全)

如果執行緒涉及大量的CPU操作,使用執行緒會降低程式的執行速度,見程式碼。

#!/usr/bin/python
# -*- coding: utf-8 -*-

import time
from threading import Thread
from multiprocessing import Process
from timeit import Timer

def countdown(n):
    while n > 0:
        n -= 1

def t1():
    COUNT=100000000
    thread1 = Thread(target=countdown,args=(COUNT,))
    thread1.start()
    thread1.join()
#    COUNT = 100000000 # 100 million
#    countdown(COUNT)

def t2():
    COUNT=100000000
    thread1 = Thread(target=countdown,args=(COUNT//2,))
    thread2 = Thread(target=countdown,args=(COUNT//2,))
    thread1.start(); thread2.start()
    thread1.join(); thread2.join()

def t3():
    COUNT=100000000
    p1 = Process(target=countdown,args=(COUNT//2,))
    p2 = Process(target=countdown,args=(COUNT//2,))
    p1.start(); p2.start()
    p1.join(); p2.join()

if __name__ == '__main__':
    t = Timer(t1)
    print 'countdown in one thread:',t.timeit(1)
    t = Timer(t2)
    print 'countdown use two thread:',t.timeit(1)
    t = Timer(t3)
    print 'countdown use two Process',t.timeit(1)

    '''
    result:多執行緒最慢,多程序最快
    countdown in one thread:5.18
    countdown use two thread:18.26
    countdown use two Process:3.22
    '''

常見解決GIL鎖的方法:使用多程序,將程式設計為大量獨立的執行緒集合。

multiprocessing模組

multiprocessing模組為在子程序中執行任務、通訊、共享資料,以及執行各種形式的同步提供支援。該模組更適合在UNIX下使用。

這個模組的介面與threading模組的介面類似。但是和執行緒不同,程序沒有任何共享狀態,因此,如果某個程序修改了資料,改動只限於該程序內,並不影響其他程序。

(不同程序內,id(10)是不一樣的,因為每個程序是相互獨立的)

Process

Process(group=None, target=None, name=None, args=(), kwargs={})

這個類構造了一個Process程序。表示一個執行在子程序中的任務,應使用關鍵字引數來指定建構函式中的引數

如果一個類繼承了Process,在進行有關程序的操作前,確保呼叫了Process的建構函式。

group,預留引數,一直為None

target,程序啟動時執行的可呼叫物件,由run()方法呼叫

name,程序名

args,target處可呼叫物件的引數,如果可呼叫物件沒有引數,不用賦值

kwargs,target處可呼叫物件的關鍵字引數

Process的例項p具有的屬性。

  1. p.is_alive()

    如果p在執行,返回True

  2. p.join([timeout])

    等待程序p執行結束

    timeout是可選的超時期限。如果timeout為None,則認為要無限期等待

  3. p.run()

    程序啟動時執行的方法。預設情況下,會呼叫傳遞給Process建構函式中的target

    定義程序的另一種方法是繼承Process並重寫run()方法

  4. p.start()

    執行程序p,並呼叫p.run()

  5. p.terminate()

    強制殺死程序。如果呼叫此方法,程序p將被立即終止,同時不會進行任何清理動作。如果程序p建立了自己的子程序,這些程序將會變成殭屍程序

    此方法要小心使用

    If this method is used when the associated process is using a pipe or queue then the pipe or queue is liable to become corrupted and may become unusable by other process. Similarly, if the process has acquired a lock or semaphore etc. then terminating it is liable to cause other processes to deadlock.

  6. p.authkey

    程序的身份驗證鍵

  7. p.daemon

    守護程序標誌,布林變數。指程序是否為後臺程序。如果該程序為後臺程序(daemon = True),當建立它的Python程序終止時,後臺程序將自動終止

    p.daemon的值要在使用p.start()啟動程序之前設定

    禁止後臺程序建立子程序

  8. p.exitcode

    程序的整數退出碼。如果程序仍在執行,值為None。如果值為-N,表示程序由訊號N終止

  9. p.name

    程序名

  10. p.pid

    程序號

Note that the start(), join(), is_alive(), terminate() and exitcode methods should only be called by the process that created the process object.

code

如果在windows平臺,需要在cmd下執行,不能在idle執行。

#!/usr/bin/python
# -*- coding: utf-8 -*-
import multiprocessing,time

class Clock(multiprocessing.Process):
    times = 0
    def __init__(self,inter):
        multiprocessing.Process.__init__(self)
        self.inter = inter
        Clock.times += 1

    def run(self):
        print id(1)
        while 1:
            print "time%s is %s"%(self.inter,time.ctime())
            time.sleep(self.inter)

if __name__ == "__main__":
    '''不能在idle執行,在dos下執行,將以下注釋開啟/關閉執行結果是不一樣的'''
    a = Clock(1)
    b = Clock(2)
    '''daemon,True = 主程序終止,子程序終止,False = 主程序終止,子程序不會終止'''
    #a.daemon = True
    print a.daemon 
    print b.daemon
    a.start()
    b.start()
    time.sleep(5)
    #b.join()
    print id(1)    #不同程序內,地址是不同的
    print 'finish'

Queue

multiprocessing模組支援的程序間通訊的方式:管道和佇列。這兩種方法都是使用訊息傳遞實現的。

Queue([size])

建立一個共享的程序佇列

size為佇列的最大長度,預設為無大小限制。

底層使用管道,鎖和訊號量實現。利用執行緒將佇列中的資料傳輸到管道。

When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.

Queue的例項q具有以下方法。

  1. q.qsize()

    返回佇列中成員的數量

    但是此結果並不可靠,因為多執行緒和多程序,在返回結果和使用結果之間,佇列中可能新增/刪除了成員。

  2. q.empty()

    如果呼叫此方法時,q為空,返回True

    但是此結果並不可靠,因為多執行緒和多程序,在返回結果和使用結果之間,佇列中可能新增/刪除了成員。

  3. q.full()

    如果呼叫此方法時,q已滿,返回True

    但是此結果並不可靠,因為多執行緒和多程序,在返回結果和使用結果之間,佇列中可能新增/刪除了成員。

  4. q.put(obj[, block[, timeout]])

    將obj放入佇列中。如果佇列已滿,此方法將阻塞至佇列有空間可用為止

    block,控制阻塞行為,預設為True;如果設定為False,將obj放入佇列時,如果沒有可用空間的話,將引發Queue.Full異常

    timeout為阻塞時間,超時後將引發Queue.Full異常。預設為無限制等待。

  5. q.put_nowait(obj)

    等價於q.put(obj,False)

  6. q.get([block[, timeout]])

    返回q中的一個成員。如果佇列為空,此方法將阻塞至佇列中有成員可用為止

    block控制阻塞行為,預設為True;如果設定為False,如果佇列中沒有可用成員,將引發Queue.Empty異常

    timeout為阻塞的時間,超時後將引發Queue.Empty異常。預設為無限制等待。

  7. q.get_nowait()

    等價於q.(get,False)

  8. q.close()

    關閉佇列,防止佇列中放入更多資料。呼叫此方法時,後臺執行緒將繼續寫入那些已入佇列但尚未寫入的資料,待這些資料寫入完成後將馬上關閉佇列

    q在被垃圾回收時將呼叫此方法

    q被關閉後,q.get()可以正常使用,q.put(),q.qsize(),q.empty(),q.full()等操作會丟擲異常

  9. q.join_thread()

    此方法用於q.close()後,等待後臺執行緒執行完成,阻塞主執行緒至後臺執行緒執行結束,保證緩衝區中的資料放入管道

    呼叫q.cancel_join_thread()可禁止此行為。

    Join the background thread. This can only be used after close() has been called. It blocks until the background thread exits, ensuring that all data in the buffer has been flushed to the pipe.

    By default if a process is not the creator of the queue then on exit it will attempt to join the queue’s background thread. The process can call cancel_join_thread() to make join_thread() do nothing.

  10. q.cancel_join_thread()

    阻止q.join_thread()阻塞主線

    Prevent join_thread() from blocking. In particular, this prevents the background thread from being joined automatically when the process exits

code

#!/usr/bin/python
# -*- coding: utf-8 -*-
import multiprocessing,time
class Consumer(multiprocessing.Process):
    def __init__(self,queue,lock):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.lock = lock

    def run(self,):
        times = 5
        while times:
            times -= 1
            i = self.queue.get()
            self.lock.acquire()
            print 'get = %s, %s'%(i,type(i))
            self.lock.release()

class Producer(multiprocessing.Process):
    def __init__(self,queue,lock):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.lock = lock

    def run(self,):
        times = 6
        while times:
            times -= 1
            self.queue.put(times)
            self.lock.acquire()
            print 'put = %s'%(times)
            self.lock.release()
            time.sleep(2)

if __name__ == "__main__":
    '''不能在idle執行,在dos下執行'''
    q = multiprocessing.Queue(5)
    lock = multiprocessing.Lock()

    a = Consumer(q,lock)
    a.start()

    b = Producer(q,lock)
    b.start()

    for i in range(2):
        lock.acquire()
        print q.empty(),q.full(),q.qsize()
        lock.release()
        time.sleep(0.5)

    q.close()
    #q.join_thread()
    q.cancel_join_thread()
    try:
        q.put(1)
    except Exception,e:
        lock.acquire()
        print 'put Exception',e
        lock.release()

    try:
        print q.empty(),q.full(),q.qsize()
    except Exception,e:
        lock.acquire()
        print 'else',e
        lock.release()

    print 'join'
    a.join()
    b.join()
    print 'join finish'

JoinableQueue

Queue的子類,比Queue多了task_done()和join()方法。

佇列允許成員的使用者通知生產者成員已經被成功取出。

通知程序使用共享的訊號和條件變數實現。

JoinableQueue的例項q除了與Queue的例項相同的方法外,還具有以下方法

  1. q.task_done()

    使用者使用此方法發出訊號,表示q.get()的成員已經被成功處理

    如果呼叫此方法的次數大於佇列中的刪除的成員數量,將引發ValueError異常

  2. q.join()

    生產者使用此方法進行阻塞,阻塞至佇列中的每個成員均呼叫了q.task_done()為止

code

#!/usr/bin/python
# -*- coding: utf-8 -*-
import multiprocessing,time

class Consumer(multiprocessing.Process):
    def __init__(self,queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self,):
        while 1:
            info = self.queue.get()
            print 'pid = %s,info = %s'%(self.pid,info)
            self.queue.task_done()
            if info == None:
                break
            #self.queue.task_done()  #註釋開啟,報錯,呼叫次數太多
            time.sleep(1)
        print 'pid = %s over'%(self.pid)

if __name__ == "__main__":
    '''不能在idle執行,在dos下執行'''
    q = multiprocessing.JoinableQueue()
    '''兩個程序輪流從佇列中取資料'''
    a = Consumer(q)
    a.start()
    a = Consumer(q)
    a.start()
    for i in range(18):
        print 'put %s'%i
        q.put(i)
        if (i+1) % 3 == 0:  #三個一組,如果佇列不為空,則等待至佇列為空
            q.join()
    q.put(None)
    q.put(None)
    q.join()                #保證佇列中的資料都被處理完
    print 'join finish'

Pipe

管道,程序間訊息傳遞的一種形式。

Pipe([duplex])

在程序之間建立一條管道(用於具有親緣關係程序間的通訊),返回元組(conn1,conn2)

conn1,conn2表示管道兩端的Connection物件。預設情況下,管道是雙向的,如果將duplex置為False,conn1只能用於接收,conn2只能用於傳送。

如果某個Process使用了管道,Pipe必須在Process物件之前建立。

Connection

用於傳送和接收報文或picklable objects。Connection物件通常使用Pipe()建立。

Connection的例項c具有的屬性

  1. c.send(obj)

    向管道另一端的Connection物件傳送訊息

    The obj must be picklable. Very large pickles (approximately 32 MB+, though it depends on the OS) may raise a ValueError exception.

  2. c.recv()

    接收Connection物件傳送的訊息。如果沒有訊息可接收,將阻塞至有訊息為止

    如果連線的另一端已經關閉,並且沒有可接收的物件,將引發EOFError異常。

  3. c.fileno()

    返回連線使用的檔案描述符

  4. c.close()

    關閉連線。

    如果c被垃圾回收機制處理了,將呼叫該方法。

  5. c.poll([timeout])

    返回是否有可讀的可用資料

    timeout為等待的最長時限。預設為立即返回結果,如果將timeout置為None,將無限期等待

  6. c.send_bytes(buffer[, offset[, size]])

    buffer,支援緩衝區介面的物件

    offset,指定緩衝區中的位元組偏移量

    size,傳送位元組數

  7. c.recv_bytes([maxlength])

    maxlength,指定接收的最大位元組數,如果接收到的訊息超過了這個最大值,將引發IOError異常。如果連線的另一端已經關閉,並且沒有可接收的物件,將引發EOFError異常。

  8. c.recv_bytes_into(buffer[, offset])

    接收位元組訊息,並將其儲存在buffer物件中

    offset為緩衝區中放置訊息的位元組偏移。如果接收到的訊息超過了緩衝區長度,將引發BufferTooShort異常

管道由作業系統進行引用計數的,必須在所有程序中關閉管道才能生成EOFError異常。因此只在生產者中關閉管道是不會有效果的,消費者也必須關閉相同的管道才會起作用。

也就是說單方關閉管道沒有任何效果,必須全部程序關閉該管道才會起作用。

#!/usr/bin/python
# -*- coding: utf-8 -*-
import multiprocessing,time
class Consumer(multiprocessing.Process):
    def __init__(self,out,in_put,time):
        multiprocessing.Process.__init__(self)
        self.recv = recv
        self.sleep = time

    def run(self):
        while 1:
            #print 'waiting...'
            item = self.recv.recv()
            if item == None:
                self.recv.close()
                break
            print 'pid = %s,item = %s'%(self.pid,item)
            time.sleep(self.sleep)  
        print 'pid = %s over'%(self.pid)

if __name__ == "__main__":
    '''不能在idle執行,在dos下執行'''
    '''recv只能接受,send只能傳送'''
    recv,send = multiprocessing.Pipe(False)
    a = Consumer(recv,send,0)
    a.start()
    a = Consumer(recv,send,1)
    a.start()
    recv.close()
    for i in range(10) + [None,None]:
        send.send(i)
    send.close()
    print 'send close'
    a.join()
    print 'join finish'

程序池,Pool

Pool([num_processes[, initializer[, initargs[, maxtasksperchild]]]])

建立程序池。

num_processes,表示工作程序個數,預設為None,表示worker程序數為cpu_count()(CPU數量)

initializer,表示工作程序啟動時呼叫的初始化函式

initargs,表示initializer函式的引數,如果initializer不為None,在每個工作程序start之前會呼叫initializer(initargs)

maxtaskperchild,表示每個工作程序在退出/被其他新的程序替代前,需要完成的工作任務數,預設為None,表示工作程序存活時間與pool相同,即不會自動退出/被替換。

當有程序退出時,程序池會建立新的程序,保證程序池中的程序數不變(程序號不會變…)。

程序池內部由多個執行緒互相協作。

程序池物件的方法只能由建立程序池的程序使用

Worker processes within a Pool typically live for the complete duration of the Pool’s work queue. A frequent pattern found in other systems (such as Apache, mod_wsgi, etc) to free resources held by workers is to allow a worker within a pool to complete only a set amount of work before being exiting, being cleaned up and a new process spawned to replace the old one. The *maxtasksperchild*argument to the Pool exposes this ability to the end user.

Pool的例項p具有的屬性

  1. p.apply(func[, args[, kwds]])

    func,表示在程序池中執行的函式

    args、kwds分別表示func的位置引數和關鍵字引數

    與內嵌的apply()函式類似,父程序會被阻塞至func執行結束。

    apply()呼叫的是apply_async(),只不過沒有返回AsyncResult的例項,而是等待func執行結束,返回func的執行結果。

    每次只能向程序池分配一個任務

  2. p.apply_async(func[, args[, kwds[, callback]]])

    func,表示在程序池中執行的函式

    args、kwds分別表示func的位置引數和關鍵字引數

    callback為一個單引數的方法,當結果返回時,callback方法會被呼叫,引數即為任務執行後的結果,callback禁止執行任何阻塞操作,否則將阻塞接收其他非同步操作的結果

    p.apply()的變種,採用非阻塞(非同步)的呼叫方式,非同步地執行函式,返回值為AsyncResult的例項,可用於函式的返回值

    每次只能向程序池分配一個任務

  3. p.map(func, iterable[, chunksize])

    func,表示在程序池中執行的函式
    iterable,func的引數序列
    chunksize,表示將iterable序列按每組chunksize的大小進行分割,每個分割後的序列提交給程序池中的一個程序處理

    注:it supports only one *iterable* argument though

    與內建的map函式基本一致,map為map_async的阻塞版本,直接阻塞至函式全部返回

    一次可分配多個任務到程序池中

  4. p.map_async(func, iterable[, chunksize[, callback]])

    func,表示執行此任務的方法
    iterable,表示任務引數序列
    chunksize,表示將iterable序列按每組chunksize的大小進行分割,每個分割後的序列提交給程序池中的一個任務進行處理
    callback,表示一個單引數的方法,當有結果返回時,callback方法會被呼叫,引數即為任務執行後的結果

    同map函式,但是該方法為非阻塞(非同步),返回值為AsyncResult的例項,稍後可用於獲得結果。

    一次可分配多個任務到程序池中

  5. p.imap(func, iterable[, chunksize])

    與map相同,只不過返回迭代器

  6. p.imap_unordered(func, iterable[, chunksize])

    同imap,返回結果是無序的

  7. p.terminate()

    立即終止程序池中的所有程序,同時不執行任何清理或結束任何掛起工作。

    如果p被垃圾回收時,將自動呼叫此函式。

  8. p.close()

    關閉程序池(pool),使其不在接受新的任務,如果仍有任務在執行,將等待其執行結束

  9. p.join()

    主程序阻塞等待子程序的退出,join方法必須在close或terminate之後使用

AsyncResult

The class of the result returned by Pool.apply_async() and Pool.map_async().

  1. p.get([timeout])

    返回結果,如果有必要則等待結果

    timeout為等待時間,如果在指定的時間內沒有等到結果,將引發異常。

  2. p.wait([timeout])

    等待結果變為可用,即任務執行完成

  3. p.ready()

    如果呼叫完成,返回True

  4. p.successful()

    表示整個呼叫執行狀態,如果程序仍在執行,則丟擲AssertionError異常。

#coding: utf-8
import multiprocessing
import time
def func(msg):
    print "msg:%s,time = %s"%(msg,time.ctime())
    time.sleep(1)

if __name__ == "__main__":
    '''apply_async與apply區別'''
    pool = multiprocessing.Pool(processes = 3)
    t1 = time.time()
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去
    pool.close()
    pool.join()   #呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束
    t2 = time.time()
    print u"非同步執行時間",t2 - t1

    pool = multiprocessing.Pool(processes = 3)
    t1 = time.time()
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))
    pool.close()
    pool.join()  
    t2 = time.time()
    print u"同步執行時間",t2 - t1
#!/usr/bin/python
# -*- coding: utf-8 -*-

import time,os
from multiprocessing import Pool
def run(a):
  #print os.getpid(),time.ctime()
  time.sleep(1)
  return a * 2

if __name__ == "__main__":
  l = range(10)    
  pool = Pool(5)  #建立擁有5個程序數量的程序池
  e1 = time.time()
  '''程式中的r1表示全部程序執行結束後全域性的返回結果集,run函式有返回值,所以一個程序對應一個返回結果,
  這個結果存在一個列表中,也就是一個結果堆中,實際上是用了佇列的原理,等待所有程序都執行完畢,就返回這個列表(列表的順序不定)。'''
  r = pool.map(run, l)
  print 'pass time = %s'%(time.time() - e1)
  print u"map exe time = %s"%(time.time()-e1)
  print r

  print "======================="
  e1 = time.time()
  '''程式中的r1表示全部程序執行結束後全域性的返回結果集,run函式有返回值,所以一個程序對應一個返回結果,
  這個結果存在一個列表中,也就是一個結果堆中,實際上是用了佇列的原理,等待所有程序都執行完畢,就返回這個列表(列表的順序不定)。'''
  r = pool.map_async(run,l)
  print type(r),r.ready()
  print 'pass time = %s'%(time.time() - e1)
  r.wait()
  print r.ready()
  e2 = time.time()
  print u"map_async exe time = %s"%(e2-e1)
  print r
  print r.get()

  pool.close()#關閉程序池,不再接受新的程序
  pool.join() #主程序阻塞等待子程序的退出

共享資料與同步

通常,程序之間是相互獨立的。但是通過共享記憶體(nmap模組),程序之間可以共享物件,使多個程序可以訪問同一個變數(地址相同,變數名可能不同)。

多程序共享資源必然會導致程序間相互競爭,所以應該盡最大可能防止使用共享狀態。

It is possible to create shared objects using shared memory which can be inherited by child processes.

Value

Value((typecode_or_type, args[, lock])

Return a ctypes object allocated from shared memory. By default the return value is actually a synchronized wrapper for the object.

在共享記憶體中建立ctypes()物件

typecode_or_type,決定返回物件的型別,array模組使用的型別程式碼(如’i’,’d’等)或者ctypes模組的型別物件。

args,傳遞給typecode_or_type建構函式的引數

lock,預設為True,建立一個互斥鎖來限制對Value物件的訪問,如果傳入一個鎖,如Lock或RLock的例項,將用於同步。如果傳入False,Value的例項就不會被鎖保護,它將不是程序安全的。

Array()

Array(typecode_or_type, size_or_initializer, **kwds)

Returns a synchronized shared array

Array型別

Type code     C Type           Minimum size in bytes 
'c'         character            1 
'b'         signed integer       1 
'B'         unsigned integer     1 
'u'         Unicode character    2 
'h'         signed integer       2 
'H'         unsigned integer     2 
'i'         signed integer       2 
'I'         unsigned integer     2 
'l'         signed integer       4 
'L'         unsigned integer     4 
'f'         floating point       4 
'd'         floating point       8 
#!/usr/bin/python
# -*- coding: utf-8 -*-
import multiprocessing
def f(n, a):
    n.value   = 3.14
    a[0]      = 5

if __name__ == '__main__':
    num   = multiprocessing.Value('d', 0.0)
    arr   = multiprocessing.Array('i', range(10))
    p = multiprocessing.Process(target=f, args=(num, arr))
    p.start()
    p.join()
    print num.value
    print arr[:]

同步

以下為multiprocessing模組提供的同步原語

原語 描述
Lock 互斥鎖
RLock 可重入的互斥鎖(同一個程序可以多次獲得它,同時不會造成阻塞)
Semaphore 訊號量
BoundedSemaphore 有邊界的訊號量
Event 事件
Condition 條件變數

託管物件

和執行緒不同,程序不支援共享物件。儘管可以利用共享記憶體建立共享值和陣列,但是對Python物件不起作用。

multiprocessing模組提供了一種共享Python物件的途徑,但是它們得在管理器的控制下。

管理器是獨立執行的子程序,其中存在真實的物件,並以伺服器的形式執行,其他程序通過使用代理訪問共享物件,這些代理作為客戶端執行。

使用簡單託管物件的最直觀方式就是Manager()函式。

Manager()

BaseManager的子類,返回一個啟動的SyncManager()例項,可用於建立共享物件並返回訪問這些共享物件的代理。

Returns a started SyncManager object which can be used for sharing objects between processes. The returned manager object corresponds to a spawned child process and has methods which will create shared objects and return corresponding proxies.

BaseManager

BaseManager([address[, authkey]]),建立管理器伺服器的基類

address = (hostname,port),指定伺服器的網址地址,預設為簡單分配一個空閒的埠

authkey,連線到伺服器的客戶端的身份驗證,預設為current_process().authkey的值

BaseManager的例項m具有以下屬性

  1. start([initializer[, initargs]])

    啟動一個單獨的子程序,並在該子程序中啟動管理器伺服器

  2. get_server()

    獲取伺服器物件

  3. connect()

    連線管理器物件

  4. shutdown()

    關閉管理器物件,只能在呼叫了start()方法之後呼叫

  5. address

    只讀屬性,管理器伺服器正在使用的地址

SyncManager

SyncManager的例項m具有以下屬性

以下型別均不是程序安全的,需要加鎖..

  1. Array(self,*args,**kwds)
  2. BoundedSemaphore(self,*args,**kwds)
  3. Condition(self,*args,**kwds)
  4. Event(self,*args,**kwds)
  5. JoinableQueue(self,*args,**kwds)
  6. Lock(self,*args,**kwds)
  7. Namespace(self,*args,**kwds)
  8. Pool(self,*args,**kwds)
  9. Queue(self,*args,**kwds)
  10. RLock(self,*args,**kwds)
  11. Semaphore(self,*args,**kwds)
  12. Value(self,*args,**kwds)
  13. dict(self,*args,**kwds)
  14. list(self,*args,**kwds)
#!/usr/bin/python
# -*- coding: utf-8 -*-
import multiprocessing
def f(x, arr, l,d,n):
    x.value = 3.14
    arr[0] = 5
    l.append('Hello')
    d[1] = 2
    n.a = 10

if __name__ == '__main__':
    server = multiprocessing.Manager()
    x    = server.Value('d', 0.0)
    arr  = server.Array('i', range(10))
    l    = server.list()
    d    = server.dict()
    n    = server.Namespace()

    proc = multiprocessing.Process(target=f, args=(x, arr, l,d,n))
    proc.start()
    proc.join()

    print x.value
    print arr
    print l
    print d
    print n
#!/usr/bin/python
# -*- coding: utf-8 -*-
import multiprocessing,time
def watch(d,event):
    while 1:
        event.wait()
        print d
        event.clear()

if __name__ == "__main__":
    m = multiprocessing.Manager()
    d = m.dict()
    event = m.Event()

    p = multiprocessing.Process(target = watch,args = (d,event))
    p.daemon = True
    p.start()

    d['1'] = 1
    event.set()
    time.sleep(1)

    d['2'] = 2
    event.set()
    time.sleep(1)
    p.terminate()
    m.shutdown()
    print "finish"
#!/usr/bin/python
# -*- coding: utf-8 -*-

import multiprocessing,time

def add(d,times,l):
    while times > 0:
        times -= 1
        l.acquire()
        d.value += 1
        l.release()

if __name__ == "__main__":
    '''驗證Value等共享物件是不是程序安全的,結果不是程序安全的,如果將鎖去掉,結果不等於20000'''
    m = multiprocessing.Manager()
    d = m.Value('d', 0.0)
    l = m.Lock()
    times = 10000
    p = multiprocessing.Process(target = add,args = (d,times,l))
    p.daemon = True
    p.start()
    while times > 0:
        times -= 1
        l.acquire()
        d.value += 1
        l.release()
    p.join()
    print d
    m.shutdown()   #如果不用了,關閉伺服器端- -
    print "finish"

使用多程序的建議

  1. 最好閱讀官方文件,官方文件更全面;
  2. 確保程序間傳遞的所有資料都能夠序列化;
  3. 儘可能的避免使用共享資料,儘可能使用訊息傳遞和佇列。使用訊息傳遞時,不必擔心同步等問題。當程序數增加時,還能提供更好的擴充套件;
  4. 注意關閉程序的方式,需要顯式地關閉程序,並使用一種良好的終止模式,而不要僅僅依賴垃圾收集或被迫使用terminate()操作強制終止子程序;
  5. 管理器和代理的使用與分散式計算密切相關;
  6. multiprocessing源於pyprocessing的第三方庫;
  7. 此模組更適用於UNIX系統;
  8. 最重要的一點,儘量讓事情變得簡單;

練習code

#!/usr/bin/python
# -*- coding: utf-8 -*-
from Tkinter import *
import threading,multiprocessing,time

def recv(queue,var):
    while True:
        s = queue.get()
        var.set(s)
        if s == 10000:
            break
    print 'recv over',id(5000)

def send(queue,start,end):
    for i in range(start,end):
        #time.sleep(1)
        print "put %s"%i
        queue.put(i)
    print "send over",id(5000)

if __name__ == "__main__":
    root = Tk()
    root.title("hello world")
    root.geometry('300x100')
    var = StringVar()
    e = Entry(root, textvariable = var)
    var.set("hello")
    e.pack(side = TOP)

    l = []
    queue = multiprocessing.Queue(5)
    p = multiprocessing.Process(target = send,args = (queue,10,5000))
    p.daemon = True
    p.start()

    p = multiprocessing.Process(target = send,args = (queue,5000,10001))
    p.daemon = True
    p.start()

    thread = threading.Thread(target = recv,args = (queue,var))
    thread.daemon = True
    thread.start()

    queue.put('a')
    root.mainloop()
    queue.close()
    p.join()
    thread.join()
    print 'over',l,id(5000)

如果覺得本文對您有幫助,請點選‘頂’支援一下,您的支援是我寫作最大的動力,謝謝。

參考網址

官方文件

同步,非同步,阻塞,非阻塞

GIL鎖

程序池原始碼分析

參考書籍

  1. Python參考手冊(第四版)