Python多程序模組multiprocessing
概述:
Python中的多程序由multiprocessing模組提供,multiprocessing模組中通過Process和Pool兩個類提供多程序服務,除了這兩個類以外,還提供了程序鎖(RLock
,Lock
,Event
,Condition
和 Semaphore)
和程序間資料共享(Array、Manager和queues的Queue類)等工具;
由於在Linux中,每一個子程序的資料都是由父程序提供的,每啟動一個子程序都克隆一份父程序資料,這樣以來,當程式建立大量程序時會消耗大量系統資源,導致系統資源耗盡,此時就出現了multiprocessing模組中Process和Pool兩個類,具體這兩個類的功能在後續中介紹;
Process類:
Process類的實現是基於fork機制,因此不被windows平臺支援
一、使用Process建立多執行緒:
import os import multiprocessing def foo(i): # 同樣的引數傳遞方法 print("這裡是 ", multiprocessing.current_process().name) print('模組名稱:', __name__) print('父程序 id:', os.getppid()) # 獲取父程序id print('當前子程序 id:', os.getpid()) # 獲取自己的程序id print('------------------------') if __name__ == '__main__': for i in range(5): p = multiprocessing.Process(target=foo, args=(i,)) p.start()
輸出結果:
這裡是 Process-2 模組名稱: __mp_main__ 父程序 id: 880 當前子程序 id: 5260 -------------- 這裡是 Process-3 模組名稱: __mp_main__ 父程序 id: 880 當前子程序 id: 4912 -------------- 這裡是 Process-4 模組名稱: __mp_main__ 父程序 id: 880 當前子程序 id: 5176 -------------- 這裡是 Process-1 模組名稱: __mp_main__ 父程序 id: 880 當前子程序 id: 5380 -------------- 這裡是 Process-5 模組名稱: __mp_main__ 父程序 id: 880 當前子程序 id: 3520 --------------
二、程序池Pool類:
上面講到:程式過多的建立程序導致系統資源過多的被佔用,影響系統執行,此時就是體現Pool類的重要性的時候了,通常我們在使用C語言程式設計時會建立一個程序池,在這裡Python直接封裝了這個程序池,方便了我們使用,Pool類就是為程式建立一定數目的程序,程序池內部維護了一個程序序列,需要時就去程序池中拿取一個程序,如果程序池序列中沒有可供使用的程序,那麼程式就會等待,直到程序池中有可用程序為止。
程序池常用的方法:
- apply() 同步執行(序列)
- apply_async() 非同步執行(並行)
- terminate() 立刻關閉程序池
- join() 主程序等待所有子程序執行完畢。必須在close或terminate()之後。
- close() 等待所有程序結束後,才關閉程序池
def fun_test(time_f):
#print "pid %s start at %s :", os.gitpid(),time.
time.sleep(time_f)
print "pid num is :", os.getpid()
if __name__ == '__main__':
pool = multiprocessing.Pool(2)
amount = raw_input("Please input pages :")
for i in range(int(amount)):
pool.apply_async(fun_test, (time_t, ))
pool.close()
pool.join()
輸出結果:
Please input pages :2
pid num is : 20149
pid num is : 20150
兩個程序非同步進行,當執行上述程式碼是可以看到兩個程序同時結束;需要注意的是程式需要先呼叫pool.close關閉從程序池中繼續啟用程序,然後呼叫pool.join等待子程序全部結束;
三、程序間的資料共享
在Linux中,每個子程序的資料都是由父程序提供的,每啟動一個子程序就從父程序克隆一份資料。
建立一個程序需要非常大的開銷,每個程序都有自己獨立的資料空間,不同程序之間通常是不能共享資料的,要想共享資料,一般通過中介軟體來實現。
3.1使用Array共享資料
對於Array陣列類,括號內的“i”表示它內部的元素全部是int型別,而不是指字元“i”,陣列內的元素可以預先指定,也可以只指定陣列的長度。Array類在例項化的時候必須指定陣列的資料型別和陣列的大小,類似temp = Array('i', 5)
。對於資料型別有下面的對應關係:
'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double
下面是一個多程序修改同一個陣列的例子:
from multiprocessing import Process
from multiprocessing import Array
def func(i,temp):
temp[0] += 100
print("程序%s " % i, ' 修改陣列第一個元素後----->', temp[0])
if __name__ == '__main__':
temp = Array('i', [1, 2, 3, 4])
for i in range(10):
p = Process(target=func, args=(i, temp))
p.start()
執行結果:
程序2 修改陣列第一個元素後-----> 101
程序4 修改陣列第一個元素後-----> 201
程序5 修改陣列第一個元素後-----> 301
程序3 修改陣列第一個元素後-----> 401
程序1 修改陣列第一個元素後-----> 501
程序6 修改陣列第一個元素後-----> 601
程序9 修改陣列第一個元素後-----> 701
程序8 修改陣列第一個元素後-----> 801
程序0 修改陣列第一個元素後-----> 901
程序7 修改陣列第一個元素後-----> 1001
3.2、使用Manager共享資料
通過Manager類也可以實現程序間資料的共享,Manager()返回的manager物件提供一個服務程序,使得其他程序可以通過代理的方式操作Python物件。manager物件支援
list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
,Array
等多種格式
from multiprocessing import Process
from multiprocessing import Manager
def func(i, dic):
dic["num"] = 100+i
print(dic.items())
if __name__ == '__main__':
dic = Manager().dict()
for i in range(10):
p = Process(target=func, args=(i, dic))
p.start()
p.join()
執行結果:
[('num', 100)]
[('num', 101)]
[('num', 102)]
[('num', 103)]
[('num', 104)]
[('num', 105)]
[('num', 106)]
[('num', 107)]
[('num', 108)]
[('num', 109)]
3.3使用queues的Queue類共享資料
multiprocessing的queues模組,提供了一個Queue佇列類,可以實現程序間的資料共享,如下例所示:
import multiprocessing
from multiprocessing import Process
from multiprocessing import queues
def func(i, q):
ret = q.get()
print("程序%s從佇列裡獲取了一個%s,然後又向佇列裡放入了一個%s" % (i, ret, i))
q.put(i)
if __name__ == "__main__":
lis = queues.Queue(20, ctx=multiprocessing)
lis.put(0)
for i in range(10):
p = Process(target=func, args=(i, lis,))
p.start()
執行結果:
程序1從佇列裡獲取了一個0,然後又向佇列裡放入了一個1
程序4從佇列裡獲取了一個1,然後又向佇列裡放入了一個4
程序2從佇列裡獲取了一個4,然後又向佇列裡放入了一個2
程序6從佇列裡獲取了一個2,然後又向佇列裡放入了一個6
程序0從佇列裡獲取了一個6,然後又向佇列裡放入了一個0
程序5從佇列裡獲取了一個0,然後又向佇列裡放入了一個5
程序9從佇列裡獲取了一個5,然後又向佇列裡放入了一個9
程序7從佇列裡獲取了一個9,然後又向佇列裡放入了一個7
程序3從佇列裡獲取了一個7,然後又向佇列裡放入了一個3
程序8從佇列裡獲取了一個3,然後又向佇列裡放入了一個8
關於queue和Queue,在Python庫中非常頻繁的出現,很容易就搞混淆了。甚至是multiprocessing自己還有一個Queue類(大寫的Q),一樣能實現queues.Queue
的功能,匯入方式是from multiprocessing import Queue
。
參考資料:
將的很詳細,也請大家關注他的其他文章,寫的都很好;