1. 程式人生 > >多執行緒、多程序、執行緒池、程序池

多執行緒、多程序、執行緒池、程序池

多工

不管是單核CPU還是多核CPU,一旦任務數量超過核數,OS都會把每個任務輪流排程到每個核心上。OS實現多程序和多執行緒往往是通過時間片的形式執行的,即讓每個任務(程序/執行緒)輪流交替執行,因為時間片切分的很小,以至於我們感覺多個任務在同時執行。

如果我們要同時執行多個任務怎麼辦?

主要有兩種解決方案:

  • 一種是啟動多個程序,每個程序雖然只有一個執行緒,但多個程序可以一塊執行多個任務。

  • 還有一種方法是啟動一個程序,在一個程序內啟動多個執行緒,這樣,多個執行緒也可以一塊執行多個任務。

  • 當然還有第三種方法,就是啟動多個程序,每個程序再啟動多個執行緒,這樣同時執行的任務就更多了,當然這種模型更復雜,實際很少採用。

總結一下就是,多工的實現有3種方式:

多程序模式
多執行緒模式
多程序+多執行緒模式

同時執行多個任務通常各個任務之間並不是沒有關聯的,而是需要相互通訊和協調,有時,任務1必須暫停等待任務2完成後才能繼續執行,有時,任務3和任務4又不能同時執行,所以,多程序和多執行緒的程式的複雜度要遠遠高於單程序單執行緒的程式。

執行緒是最小的執行單元,而程序由至少一個執行緒組成。

如何排程程序和執行緒,完全由作業系統決定,程式自己不能決定什麼時候執行,執行多長時間。

多執行緒

一個程序內可以有多個執行緒(至少一個主執行緒),多個執行緒共享該程序的所有變數,同時對全域性變數進行訪問和改寫很容易出現混亂,所以需要用鎖進行執行緒的同步控制。python由於設計時有GIL全域性鎖,所以多執行緒無法利用多核CPU,也不存在多執行緒併發的問題。

Python標準庫中提供了_thread和threading兩個模組來實現多執行緒,threading是_thread的高階封裝,因此一般只需要import threading即可。

import time, threading

def loop():
    print('thread %s is running...' % threading.current_thread().name)
    n = 0
    while n < 5:
        n = n+1
        print('thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)
    print('thread %s is end.' % threading.current_thread().name)

print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s is end.' % threading.current_thread().name)
#output
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread is end.
thread MainThread is end.

其實,多執行緒和多程序的實現很類似,程序啟動時預設的執行緒為主執行緒,主執行緒之外可以開啟新的子執行緒,current_thread()函式返回當前執行緒的例項。

多程序

Linux/Unix作業系統提供了fork()系統呼叫,fork執行一次會返回兩次,分別在父程序和子程序內返回(先父程序再子程序),父程序返回的是子程序的ID,子程序返回0。父程序要記下所有子程序的ID,而子程序可以通過getppid()獲得父進的ID。

在python中,可以通過from mutilprocessing import Process來建立程序類物件:

p = Process(target = run_proc, args = (‘args1’,…))

其中,run_proc是子程序函式,args是傳入子程序函式的引數,用tuple格式傳遞.

  • p.start()–開始執行子程序(函式)
  • p.join()–用來等待該程序結束,用於程序同步
    舉個例子:
from multiprocessing import Process
import os

def run_proc(name):
    print 'Run child process %s (%s)' % (name, os.getpid())

if __name__ == '__main__':
    print 'Parent process %s.' % os.getpid()
    p1 = Process(target=run_proc, args=('test1',))
    p2 = Process(target=run_proc, args=('test2',))
    p3 = Process(target=run_proc, args=('test3',))
    p4 = Process(target=run_proc, args=('test4',))
    print 'Process will start.'
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p1.join()
    p2.join()
    p3.join()
    p4.join()

在主程序中建立了4個子程序,但是子程序的建立順序是由OS決定的,並不是按照程式中的順序執行。

Parent process 21410.
Process will start.
Run child process test4 (24745)
Run child process test1 (24742)
Run child process test2 (24743)
Run child process test3 (24744)

Parent process 21410.
Process will start.
Run child process test1 (24754)
Run child process test4 (24759)
Run child process test2 (24755)
Run child process test3 (24756)

程序池和執行緒池

程序池和執行緒池性質相似,下面以執行緒池為例進行說明。

既然是“池”,我們很容易聯想到“水池”,在使用水的過程中,水池起到了一個緩衝的作用,避免了頻繁開關水龍頭。同理,程序池也是通過事先劃分一塊系統資源區域,這組資源區域在伺服器啟動時就已經建立和初始化,使用者如果想建立新的程序,可以直接取得資源,從而避免了動態分配資源(這是很耗時的)。

執行緒池內子程序的數目一般在3~10個之間,子執行緒都執行著相同的程式碼,並具有相同的屬性,如優先順序,PGID等。
當有新的任務來到時,主程序將通過某種方式選擇程序池中的某一個子程序來為之服務。相比於動態建立子程序,選擇一個已經存在的子程序的代價顯得小得多。至於主程序選擇哪個子程序來為新任務服務,則有兩種方法:

  • 1)主程序使用某種演算法來主動選擇子程序。最簡單、最常用的演算法是隨機演算法和 Round Robin (輪流演算法)。

  • 2)主程序和所有子程序通過一個共享的工作佇列來同步,子程序都睡眠在該工作佇列上。當有新的任務到來時,主程序將任務新增到工作佇列中。這將喚醒正在等待任務的子程序,不過只有一個子程序將獲得新任務的“接管權”,它可以從工作佇列中取出任務並執行之,而其他子程序將繼續睡眠在工作佇列上。

當選擇好子程序後,主程序還需要使用某種通知機制來告訴目標子程序有新任務需要處理,並傳遞必要的資料。最簡單的方式是,在父程序和子程序之間預先建立好一條管道,然後通過管道來實現所有的程序間通訊。在父執行緒和子執行緒之間傳遞資料就要簡單得多,因為我們可以把這些資料定義為全域性,那麼它們本身就是被所有執行緒共享的。

最後用程式碼實現一下程序池:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print 'Run task %s (%s)...' % (name, os.getpid())
    start = time.time()
    time.sleep(random.random()*3)
    end = time.time()
    print 'Task %s runs %0.2f seconds.' % (name, end-start)

if __name__ == '__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, (i,))
    print 'Waiting for all subprocesses done...'
    p.close()
    p.join()
    print 'All subprocesses done.'
#ouput
Parent process 21410.
Run task 2 (25238)...
Run task 1 (25235)...
Run task 3 (25237)...
Run task 0 (25233)...
Waiting for all subprocesses done...
Task 1 runs 0.36 seconds.
Run task 4 (25235)...
Task 0 runs 0.91 seconds.
Task 3 runs 1.05 seconds.
Task 2 runs 2.78 seconds.
Task 4 runs 2.72 seconds.
All subprocesses done.

python中Pool物件中預設子程序數量為4,因此任務4要等到程序池中某個子程序執行完後才能執行。當然可以自己設定Pool中子程序的數目。對Pool物件呼叫join()方法會等待所有子程序執行完畢,呼叫join()之前必須先呼叫close(),呼叫close()之後就不能繼續新增新的Process了。

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞的–非同步
  • apply(func[, args[, kwds]])是阻塞的-同步

記憶體池

既然說了程序池和執行緒池,也不妨再瞭解下記憶體池,記憶體池和程序池的工作方式也很類似,都是以空間換取時間。

記憶體池是一種記憶體分配方式。通常我們習慣直接使用new、malloc等系統呼叫申請分配記憶體,這樣做的缺點在於:由於所申請記憶體塊的大小不定,當頻繁使用時會造成大量的記憶體碎片並進而降低效能。

記憶體池則是在真正使用記憶體之前,先申請分配一定數量的、大小相等(一般情況下)的記憶體塊留作備用。當有新的記憶體需求時,就從記憶體池中分出一部分記憶體塊,若記憶體塊不夠再繼續申請新的記憶體。這樣做的一個顯著優點是,使得記憶體分配效率得到提升。