1. 程式人生 > >python中的多程序和多執行緒

python中的多程序和多執行緒

作者:liuyazhuang  來源:CSDN  原文:https://blog.csdn.net/l1028386804/article/details/83042246?utm_source=copy  轉載出處:https://blog.csdn.net/l1028386804/article/details/83042246

一、多程序 Python實現對程序的方式主要有兩種,一種方法是使用os模組中的fork方法,另一種方法是使用multiprocessing模組。區別在於:前者僅適用於Unix/Linux作業系統,對Windows不支援,後者則是跨平臺的實現方式。

1、使用os模組中的fork方式實現多程序 Python的os模組封裝了常見的系統呼叫,其中就有fork方法。fork方法來自於Unix/Linux作業系統中提供的一個fork系統呼叫,這個方法很特殊。普通的方法都是呼叫一次,返回一次,而fork是呼叫一次,返回兩次,原因在與作業系統將當前程序(父程序)複製出一份程序(子程序),這兩個程序幾乎完全相同,於是fork方法分別在父程序和子程序中返回。子程序中永遠返回0,父程序中返回的是子程序的ID。

示例:使用fork方法建立程序,其中os模組的getpid方法用於獲取當前程序的ID, getppid方法用於獲取父程序ID,程式碼如下:

# -*- coding:UTF-8 -*-
import os
 
if __name__ == "__main__":
    print 'current Process (%s) start ...' %(os.getpid())
    pid = os.fork()
    if pid < 0:
        print 'error in fork'
    elif pid == 0:
        print 'I am child process(%s) and my parent process is (%s)', (os.getpid(), os.getppid())
    else:
        print 'I(%s) created a child process (%s).', (os.getpid(), pid)

2、使用multiprocessing模組建立多程序 multiprocessing模組提供了一個Process類來描述一個程序物件。建立子程序時,只需要傳入一個執行函式和函式的引數,即可完成一個Process例項的建立,用start()方法啟動程序,用join()方法實現程序間的通訊。 示例如下:

# -*- coding:UTF-8 -*-
import os
from multiprocessing import Process
 
#子程序要執行的程式碼
def run_proc(name):
    print 'child process %s (%s) Running...' %(name, os.getpid())
 
if __name__ == '__main__':
    print 'Parent process %s.' % os.getpid()
    for i in  range(5):
        p = Process(target=run_proc, args=(str(i),))
        print 'Process will start'
        p.start()
    p.join()
    print 'Process end.'

3、使用multiprocessing模組提供了一個Pool物件來代表程序池物件 Pool可以提供指定數量的程序供使用者呼叫,預設大小是CPU核數。當有新的請求提交到Pool時,如果池還沒有滿,那麼就建立一個新的程序用來執行該請求;但如果池中的程序已經達到規定的最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序來處理它。 示例如下:

# -*- coding:UTF-8 -*-
 
from multiprocessing import Pool
import os, time, random
 
def run_task(name):
    print 'Task %s (pid = %s) is running...' % (name, os.getpid())
    time.sleep(random.random() * 3)
    print 'Task %s end' % name
 
if __name__ == '__main__':
    print 'Current process %s.' % os.getpid()
    p = Pool(processes=3)
    for i in range(5):
        p.apply_async(run_task, args=(i,))
    print 'Waiting for all subprocesses done...'
    p.close()
    p.join()
    print 'All subprocesses done.'

執行結果如下:

Current process 9648.
Waiting for all subprocesses done...
Task 0 (pid = 17336) is running...
Task 1 (pid = 15792) is running...
Task 2 (pid = 13052) is running...
Task 0 end
Task 3 (pid = 17336) is running...
Task 1 end
Task 4 (pid = 15792) is running...
Task 2 end
Task 3 end
Task 4 end
All subprocesses done.

上述程式先建立了容量為3的程序池,依次向程序池中添加了5個任務。從執行結果中可以看到雖然添加了5個任務,但是一開始只運行了3個,而且每次最多執行3個程序。當一個任務結束了,新的任務依次新增進來,任務執行使用的程序依然是原來的程序,這一點通過程序的pid可以看出來。 注意:Pool物件呼叫join()放大會等待所有子程序執行完畢,呼叫join()方法之前必須先呼叫close()方法,呼叫close()方法之後就不能繼續新增新的Process了。

4、程序間通訊 Python提供了多種程序間通訊的方式,例如:Queue、Pipe、Value+Array等。Queue和Pipe的區別在於Pipe常用來在兩個程序間通訊,Queue用來在多個程序間實現通訊。

(1) Queue通訊 Queue是安全的佇列,可以使用Queue實現多程序之前的資料傳遞,有兩個辦法:Put和Get可以進行Queue操作:

Put方法用以插入資料到佇列中,它還有兩個可選引數:blocked和timeout。如果blocked為True(預設),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為False,但Queue已滿,會立即丟擲Queue.Full異常。 Get方法可以從佇列中讀取並且刪除一個元素,Get同樣有兩個可選引數:blocked和timeout。如果blocked為True(預設),並且timeout為正值,那麼在等待時間內沒有取到任務元素,會丟擲Queue.Empty異常。如果blocked為False,分兩種情況:如果Queue有一個值可用,則立即返回該值;否則,如果佇列為空,則立即丟擲Queue.Empty異常。 下面的例子為:在父程序中建立三個子程序,兩個子程序往Queue寫資料,一個子程序從Queue中讀取資料。

# -*- coding:UTF-8 -*-
 
from multiprocessing import Process, Queue
import os, time, random
 
#寫資料程序執行的程式碼
def proc_write(queue, urls):
    print 'Process(%s) is writing...', os.getpid()
    for url in urls:
        queue.put(url)
        print 'Put %s to queue...',  url
        time.sleep(random.random())
 
#讀資料程序執行的程式碼
def proc_read(queue):
    print 'Process(%s) is reading...', os.getpid()
    while True:
        url = queue.get(True)
        print 'Get %s from queue.', url
 
if __name__ == '__main__':
    #父程序建立Queue,並傳遞給子程序:
    queue = Queue()
    proc_write1 = Process(target=proc_write, args=(queue, ['url_1', 'url_2', 'url_3']))
    proc_write2 = Process(target=proc_write, args=(queue, ['url_4', 'url_5', 'url_6']))
    proc_reader = Process(target=proc_read, args=(queue,))
    #啟動子程序proc_write,寫入
    proc_write1.start()
    proc_write2.start()
    #啟動子程序proc_reader,讀取
    proc_reader.start()
    #等待proc_write結束
    proc_write1.join()
    proc_write2.join()
    #proc_reader程序裡是死迴圈,無法等待其結束,只能強行終止:
    proc_reader.terminate()

(2)Pipe通訊 Pipe常用來在兩個程序間進行通訊,兩個程序分別位於管道的兩端。 Piple方法返回(conn1, conn2)代表一個管道的兩端。Pipe方法由duplex引數,如果duplex引數為True(預設),那麼這個管道是全雙工模式,也就是說conn1和conn2均可收發。若dublex為False,conn1只負責接收訊息,conn2只負責傳送訊息。send和recv方法分別是傳送和接收訊息的方法。例如:在全雙工模式下,可以呼叫conn1.send傳送訊息,conn1.recv接收訊息。如果沒有訊息可接收,recv方法會一直阻塞。如果管道已經被關閉,那麼recv方法會丟擲EOFError。 下面通過一個例子進行說明:建立兩個程序,一個子程序通過Pipe傳送資料,一個子程序通過Pipe接收資料。示例如下:

# -*- coding:UTF-8 -*-
 
from multiprocessing import Process, Queue
import os, time, random
 
#寫資料程序執行的程式碼
def proc_write(queue, urls):
    print 'Process(%s) is writing...', os.getpid()
    for url in urls:
        queue.put(url)
        print 'Put %s to queue...',  url
        time.sleep(random.random())
 
#讀資料程序執行的程式碼
def proc_read(queue):
    print 'Process(%s) is reading...', os.getpid()
    while True:
        url = queue.get(True)
        print 'Get %s from queue.', url
 
if __name__ == '__main__':
    #父程序建立Queue,並傳遞給子程序:
    queue = Queue()
    proc_write1 = Process(target=proc_write, args=(queue, ['url_1', 'url_2', 'url_3']))
    proc_write2 = Process(target=proc_write, args=(queue, ['url_4', 'url_5', 'url_6']))
    proc_reader = Process(target=proc_read, args=(queue,))
    #啟動子程序proc_write,寫入
    proc_write1.start()
    proc_write2.start()
    #啟動子程序proc_reader,讀取
    proc_reader.start()
    #等待proc_write結束
    proc_write1.join()
    proc_write2.join()
    #proc_reader程序裡是死迴圈,無法等待其結束,只能強行終止:
    proc_reader.terminate()

二、多執行緒 Python的標準庫提供了兩個模組:thread和threading,thread是低階模組,threading是高階模組,對thread進行了封裝。絕大多數情況下,我們只需要使用threading這個高階模組。

1、使用threading模組建立多執行緒 threading模組一般通過兩種方式建立執行緒:

把函式傳入並建立Thread例項,然後呼叫start方法開始執行; 直接從threading.Thread繼承並建立執行緒類,然後重寫__init__方法和run方法 (1)通過把函式傳入建立Thread例項的方式建立多執行緒 程式碼如下:

# -*- coding:UTF-8 -*-
 
import random
import time, threading
 
#新執行緒執行的程式碼
def thread_run(urls):
    print 'Current %s is running...' % threading.current_thread().name
    for url in urls:
        print '%s ------->>> %s' % (threading.current_thread().name, url)
        time.sleep(random.random())
    print '%s ended.' % threading.current_thread().name
 
print '%s running...' % threading.current_thread().name
 
t1 = threading.Thread(target=thread_run, name='Thread_1', args=(['url_1', 'url_2', 'url_3'],))
t2 = threading.Thread(target=thread_run, name='Thread_2', args=(['url_4', 'url_5', 'url_6'],))
t1.start()
t2.start()
t1.join()
t2.join()
print '%s ended.' % threading.current_thread().name
(2)從threading.Thread繼承建立執行緒類
程式碼如下:

# -*- coding:UTF-8 -*-
 
import random
import threading
import time
 
class myThread(threading.Thread):
    def __init__(self, name, urls):
        threading.Thread.__init__(self, name=name)
        self.urls = urls
 
    def run(self):
        print 'Current %s running...' % threading.current_thread().name
        for url in self.urls:
            print '%s ------>>> %s' % (threading.current_thread().name, url)
            time.sleep(random.random())
        print '%s ended.' % threading.current_thread().name
 
print '%s is running...' % threading.current_thread().name
t1 = myThread(name='Thread_1', urls=['urls_1', 'url_2', 'url_3'])
t2 = myThread(name='Thread_2', urls=['urls_4', 'url_5', 'url_6'])
t1.start()
t2.start()
t1.join()
t2.join()
print '%s ended.' % threading.current_thread().name

2、執行緒同步 使用Thread物件的Lock和RLock可以實現簡單的執行緒同步,這兩個物件都有acquire方法和release方法,對於那些每次只允許一個執行緒操作的資料,可以將其操作放到acquire和release之間。 對於Lock而言,如果一個執行緒連續兩次進行acquire操作,那麼由於第一次acquire之後沒有release,第二次acquire將掛起執行緒。這會導致Lock物件永遠不會release,使得執行緒死鎖。RLock物件允許一個執行緒多次對其進行acquire操作,因為在其內部通過一個counter維護著執行緒acquire的次數。而且每一次的acquire操作必須有一個release操作與之對應,在所有的release操作完成後,別的執行緒才能申請該RLock物件。 程式碼如下:

# -*- coding:UTF-8 -*-
import threading
 
mylock = threading.RLock()
num = 0
class MyThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self, name=name)
 
    def run(self):
        global num
        while True:
            mylock.acquire()
            print '%s locked, Numver: %d' % (threading.current_thread().name, num)
            if num >= 4:
                mylock.release()
                print '%s released, Number: %d' % (threading.current_thread().name, num)
                break
            num += 1
            print '%s released, Number: %d' % (threading.current_thread().name, num)
            mylock.release()
 
if __name__ == '__main__':
    thread1 = MyThread('thread_1')
    thread2 = MyThread('thread_2')
    thread1.start()
    thread2.start()

3、全域性直譯器鎖(GIL) 在Python的原始直譯器CPython中存在著GIL(Global Interpreter Lock,全域性直譯器鎖),因此在解釋執行Python程式碼時,會產生互斥鎖來限制執行緒對共享資源的訪問,直到直譯器遇到I/O操作或者操作次數達到一定數目時才會釋放GIL。由於全域性直譯器鎖的存在,在進行多執行緒操作的時候,不能呼叫多個CPU核心,只能利用一個核心,所以在進行CPU密集型操作的時候,不推薦使用多執行緒,更加傾向於對程序。那麼多執行緒適合什麼樣的應用場景呢?對應IO密集型操作,多執行緒可以明顯提高效率,例如:Python爬蟲專案,絕大多數時間爬蟲實在等待socket返回資料,網路IO的操作延時比CPU大得多。 ---------------------  作者:liuyazhuang  來源:CSDN  原文:https://blog.csdn.net/l1028386804/article/details/83042246?utm_source=copy