1. 程式人生 > >Python多程序模組multiprocessing

Python多程序模組multiprocessing

概述:

  Python中的多程序由multiprocessing模組提供,multiprocessing模組中通過Process和Pool兩個類提供多程序服務,除了這兩個類以外,還提供了程序鎖(RLockLockEventCondition和 Semaphore)和程序間資料共享(Array、Manager和queues的Queue類)等工具;

  由於在Linux中,每一個子程序的資料都是由父程序提供的,每啟動一個子程序都克隆一份父程序資料,這樣以來,當程式建立大量程序時會消耗大量系統資源,導致系統資源耗盡,此時就出現了multiprocessing模組中Process和Pool兩個類,具體這兩個類的功能在後續中介紹;

Process類:

Process類的實現是基於fork機制,因此不被windows平臺支援

一、使用Process建立多執行緒:

import os
import multiprocessing

def foo(i):
    # 同樣的引數傳遞方法
    print("這裡是 ", multiprocessing.current_process().name)
    print('模組名稱:', __name__)
    print('父程序 id:', os.getppid())  # 獲取父程序id
    print('當前子程序 id:', os.getpid())  # 獲取自己的程序id
    print('------------------------')

if __name__ == '__main__':

    for i in range(5):
        p = multiprocessing.Process(target=foo, args=(i,))
        p.start()

輸出結果:

這裡是  Process-2
模組名稱: __mp_main__
父程序 id: 880
當前子程序 id: 5260
--------------
這裡是  Process-3
模組名稱: __mp_main__
父程序 id: 880
當前子程序 id: 4912
--------------
這裡是  Process-4
模組名稱: __mp_main__
父程序 id: 880
當前子程序 id: 5176
--------------
這裡是  Process-1
模組名稱: __mp_main__
父程序 id: 880
當前子程序 id: 5380
--------------
這裡是  Process-5
模組名稱: __mp_main__
父程序 id: 880
當前子程序 id: 3520
--------------

二、程序池Pool類:

上面講到:程式過多的建立程序導致系統資源過多的被佔用,影響系統執行,此時就是體現Pool類的重要性的時候了,通常我們在使用C語言程式設計時會建立一個程序池,在這裡Python直接封裝了這個程序池,方便了我們使用,Pool類就是為程式建立一定數目的程序,程序池內部維護了一個程序序列,需要時就去程序池中拿取一個程序,如果程序池序列中沒有可供使用的程序,那麼程式就會等待,直到程序池中有可用程序為止。

程序池常用的方法:

  • apply() 同步執行(序列)
  • apply_async() 非同步執行(並行)
  • terminate() 立刻關閉程序池
  • join() 主程序等待所有子程序執行完畢。必須在close或terminate()之後。
  • close() 等待所有程序結束後,才關閉程序池
def fun_test(time_f):
    #print "pid %s start at %s :", os.gitpid(),time.
	time.sleep(time_f)
	print "pid num is :", os.getpid()

if __name__ == '__main__':
	pool = multiprocessing.Pool(2)
	amount = raw_input("Please input pages :")
	for i in range(int(amount)):
			pool.apply_async(fun_test, (time_t, ))
	pool.close()
	pool.join()

輸出結果:

Please input pages :2
pid num is : 20149
pid num is : 20150

兩個程序非同步進行,當執行上述程式碼是可以看到兩個程序同時結束;需要注意的是程式需要先呼叫pool.close關閉從程序池中繼續啟用程序,然後呼叫pool.join等待子程序全部結束;

三、程序間的資料共享

在Linux中,每個子程序的資料都是由父程序提供的,每啟動一個子程序就從父程序克隆一份資料。

建立一個程序需要非常大的開銷,每個程序都有自己獨立的資料空間,不同程序之間通常是不能共享資料的,要想共享資料,一般通過中介軟體來實現。

3.1使用Array共享資料

對於Array陣列類,括號內的“i”表示它內部的元素全部是int型別,而不是指字元“i”,陣列內的元素可以預先指定,也可以只指定陣列的長度。Array類在例項化的時候必須指定陣列的資料型別和陣列的大小,類似temp = Array('i', 5)。對於資料型別有下面的對應關係:

'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double

下面是一個多程序修改同一個陣列的例子:

from multiprocessing import Process
from multiprocessing import Array

def func(i,temp):
    temp[0] += 100
    print("程序%s " % i, ' 修改陣列第一個元素後----->', temp[0])

if __name__ == '__main__':
    temp = Array('i', [1, 2, 3, 4])
    for i in range(10):
        p = Process(target=func, args=(i, temp))
        p.start()

執行結果:

程序2   修改陣列第一個元素後-----> 101
程序4   修改陣列第一個元素後-----> 201
程序5   修改陣列第一個元素後-----> 301
程序3   修改陣列第一個元素後-----> 401
程序1   修改陣列第一個元素後-----> 501
程序6   修改陣列第一個元素後-----> 601
程序9   修改陣列第一個元素後-----> 701
程序8   修改陣列第一個元素後-----> 801
程序0   修改陣列第一個元素後-----> 901
程序7   修改陣列第一個元素後-----> 1001

3.2、使用Manager共享資料

  通過Manager類也可以實現程序間資料的共享,Manager()返回的manager物件提供一個服務程序,使得其他程序可以通過代理的方式操作Python物件。manager物件支援

listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue ,Array等多種格式

from multiprocessing import Process
from multiprocessing import Manager

def func(i, dic):
    dic["num"] = 100+i
    print(dic.items())

if __name__ == '__main__':
    dic = Manager().dict()
    for i in range(10):
        p = Process(target=func, args=(i, dic))
        p.start()
        p.join()

執行結果:

[('num', 100)]
[('num', 101)]
[('num', 102)]
[('num', 103)]
[('num', 104)]
[('num', 105)]
[('num', 106)]
[('num', 107)]
[('num', 108)]
[('num', 109)]

3.3使用queues的Queue類共享資料

multiprocessing的queues模組,提供了一個Queue佇列類,可以實現程序間的資料共享,如下例所示:

import multiprocessing
from multiprocessing import Process
from multiprocessing import queues

def func(i, q):
    ret = q.get()
    print("程序%s從佇列裡獲取了一個%s,然後又向佇列裡放入了一個%s" % (i, ret, i))
    q.put(i)

if __name__ == "__main__":
    lis = queues.Queue(20, ctx=multiprocessing)
    lis.put(0)
    for i in range(10):
        p = Process(target=func, args=(i, lis,))
        p.start()

執行結果:

程序1從佇列裡獲取了一個0,然後又向佇列裡放入了一個1
程序4從佇列裡獲取了一個1,然後又向佇列裡放入了一個4
程序2從佇列裡獲取了一個4,然後又向佇列裡放入了一個2
程序6從佇列裡獲取了一個2,然後又向佇列裡放入了一個6
程序0從佇列裡獲取了一個6,然後又向佇列裡放入了一個0
程序5從佇列裡獲取了一個0,然後又向佇列裡放入了一個5
程序9從佇列裡獲取了一個5,然後又向佇列裡放入了一個9
程序7從佇列裡獲取了一個9,然後又向佇列裡放入了一個7
程序3從佇列裡獲取了一個7,然後又向佇列裡放入了一個3
程序8從佇列裡獲取了一個3,然後又向佇列裡放入了一個8

關於queue和Queue,在Python庫中非常頻繁的出現,很容易就搞混淆了。甚至是multiprocessing自己還有一個Queue類(大寫的Q),一樣能實現queues.Queue的功能,匯入方式是from multiprocessing import Queue

參考資料:

將的很詳細,也請大家關注他的其他文章,寫的都很好;