1. 程式人生 > >python中multiprocessing、multiprocessing.dummy和threading用法筆記

python中multiprocessing、multiprocessing.dummy和threading用法筆記

一、multiprocessing

用法參考地址:multiprocessing用法
首先解釋一個誤區:
程序池的大小是每次同時執行的程序數,但是並不會影響主程序申請程序的數量。主程序申請多程序量不等於池子大小。

1、子程序無返回值

# -*- coding:utf-8 -*-
from multiprocessing import Pool as Pool
import time

def func(msg):
    print 'msg:', msg
    time.sleep(2)
    print 'end:'
    
pool = Pool(processes=3)
for i in xrange(1, 5):
    msg = 'hello %d' % (i)
    pool.apply_async(func,(msg,))   # 非阻塞
    # pool.apply(func,(msg,))       # 阻塞,apply()源自內建函式,用於間接的呼叫函式,並且按位置把元祖或字典作為引數傳入。
    # pool.imap(func,[msg,])        # 非阻塞, 注意與apply傳的引數的區別
    # pool.map(func, [msg, ])       # 阻塞

print 'Mark~~~~~~~~~~~~~~~'
pool.close()
pool.join()  # 呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束
print 'sub-process done'
  1. 非阻塞方法
    multiprocessing.Pool.apply_async() 和 multiprocessing.Pool.imap()
    程序併發執行
  2. 阻塞方法
    multiprocessing.Pool.apply()和 multiprocessing.Pool.map()
    程序順序執行

2、子程序有返回值

只有apply_async可以有返回值,apply,map,imap不可以設定返回值.

# -*- coding:utf-8 -*-
from multiprocessing import Pool as Pool
import time

def func(msg):
    print 'msg:', msg
    time.sleep(2)
    print 'end:'
    return msg

pool = Pool(processes=3)

result = []
for i in xrange(1, 5):
    msg = 'hello %d' % (i)
    res = pool.apply_async(func,(msg,))   # 非阻塞 只有apply_async可以有返回值,apply,map,imap不可以設定返回值
    result.append(res)

print 'Mark~~~~~~~~~~~~~~~'
pool.close()
pool.join()  # 呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束
for res in result:
    print "sub_process return: ", res.get()
print 'sub-process done'

一定要注意res.get()方法是堵塞的。只有子程序執行完畢並返回資料時 res.get()方法才會執行,否則主程序堵塞,並等待。
看下面這個程式: 如何高效處理子程序有返回值的多程序任務

from multiprocessing import Pool
import Queue
import time


def test(p):
    time.sleep(0.5)
    if p == 100:
        return (p,True)
    else:
        return (p,False)


if __name__ == "__main__":
    pool = Pool(processes=10)
    q = Queue.Queue()
    for i in xrange(500):
        # 將子程序物件存入佇列中。
        q.put( pool.apply_async(test, args=(i,)) )  # 維持執行的程序總數為10,當一個程序執行完後新增新程序.
        print(i)
    '''
    因為這裡使用的為pool.apply_async非同步方法,因此子程序執行的過程中,父程序會執行while,獲取返回值並校驗。
    '''
    print("======", q.qsize())
    while 1:
        a = q.get().get();
        print(a)
        if a[1]:
            pool.terminate()  # 結束程序池中的所有子程序。
            break
    pool.join()

該程式瞬間執行到 print("======", q.qsize()) 行,並且每次執行 a = q.get().get()程式碼時,如果對應程序沒有執行完,即沒有返回輸出值時,該行程式碼導致主程序堵塞等待。

如果需要申請龐大的程序數量時,就會很浪費資源比如下面:

for i in xrange(500000000):
        # 將子程序物件存入佇列中。
        q.put( pool.apply_async(test, args=(i,)) )  # 維持執行的程序總數為10,當一個程序執行完後新增新程序.
        print(i)

我們可以開啟2個執行緒,一個執行緒申請程序,另一個執行緒判斷結束所有子程序的程序是否已經到達。
如下:

from multiprocessing import Pool
import Queue
import threading
import time
def test(p):
    time.sleep(0.001)
    if p == 10000:
        return True
    else:
        return False
        
if __name__ == "__main__":
    result = Queue.Queue()  # 佇列
    pool = Pool()
    
    def pool_th():
        for i in xrange(50000000000):  ##這裡需要建立執行的子程序非常多
            try:
                result.put(pool.apply_async(test, args=(i,)))
            except:
                break
                
    def result_th():
        while 1:
            a = result.get().get()  # 獲取子程序返回值
            if a:
                pool.terminate()  # 結束所有子程序
                break
    '''
    利用多執行緒,同時執行Pool函式建立執行子程序,以及執行獲取子程序返回值函式。
    '''
    t1 = threading.Thread(target=pool_th)
    t2 = threading.Thread(target=result_th)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    pool.join()

3、多程序共享資源

申請程序有兩種方式一種是multiprocessing.Process(),另一種是multiprocessing.Pool(process=3).apply_async().
multiprocessing提供三種多程序之間共享資料的資料結構: Queue, Array 和Manager.

from multiprocessing import Queue, Array, Manager

Queue、和Array只適用Process類申請的多程序共享資源。
Manager可以適用Pool和Process類申請的多程序共享資源。

import time
from multiprocessing import Manager, Pool

lists = Manager().list()  # 定義可被子程序共享的全域性變數lists

def func(i):
    # time.sleep(1)
    lists.append(i)
    print i

pool = Pool(processes=3)

for i in xrange(10000000):
    if len(lists) <= 0:
        pool.apply_async(func, args=(i,))
    else:
        break

pool.close()
pool.join()
print(lists)

輸出結果為:且i最大值不定。主程序申請多程序量不等於池子大小。
在這裡插入圖片描述

二、多執行緒 Multiprocessing.dummy

1、子程序無返回值

Multiprocessing.dummy.Pool() 與Multiprocessing.Pool() 的用法一樣

  1. 非阻塞方法
    multiprocessing.dummy.Pool.apply_async() 和 multiprocessing.dummy.Pool.imap()
    執行緒併發執行
  2. 阻塞方法
    multiprocessing.dummy.Pool.apply()和 multiprocessing.dummy.Pool.map()
    執行緒順序執行

2、子程序有返回值

與多程序一樣,只有multiprocessing.dummy.Pool.apply_async()可以有返回值,apply,map,imap不可以設定返回值.
3、多程序共享資源

三、多執行緒 Threading

1、建立方法

  1. 直接使用Thread類
from threading import Thread
import time
def run(a = None, b = None) :
  print a, b 
  time.sleep(1)

t = Thread(target = run, args = ("this is a", "thread"))
#此時執行緒是新建狀態

print t.getName()#獲得執行緒物件名稱
print t.isAlive()#判斷執行緒是否還活著。
t.start()#啟動執行緒
t.join()#等待其他執行緒執行結束
  1. 繼承Thread類
from threading import Thread
import time
class MyThread(Thread) :
  def __init__(self, a) :
    super(MyThread, self).__init__()
    #呼叫父類的構造方法
    self.a = a

  def run(self) :
    print "sleep :", self.a
    time.sleep(self.a)

t1 = MyThread(2)
t2 = MyThread(4)
t1.start()
t2.start()
t1.join()
t2.join()