1. 程式人生 > >Python 執行緒池原理及實現

Python 執行緒池原理及實現

概述

傳統多執行緒方案會使用“即時建立, 即時銷燬”的策略。儘管與建立程序相比,建立執行緒的時間已經大大的縮短,但是如果提交給執行緒的任務是執行時間較短,而且執行次數極其頻繁,那麼伺服器將處於不停的建立執行緒,銷燬執行緒的狀態。

一個執行緒的執行時間可以分為3部分:執行緒的啟動時間、執行緒體的執行時間和執行緒的銷燬時間。在多執行緒處理的情景中,如果執行緒不能被重用,就意味著每次建立都需要經過啟動、銷燬和執行3個過程。這必然會增加系統相應的時間,降低了效率。

使用執行緒池:
由於執行緒預先被建立並放入執行緒池中,同時處理完當前任務之後並不銷燬而是被安排處理下一個任務,因此能夠避免多次建立執行緒,從而節省執行緒建立和銷燬的開銷,能帶來更好的效能和系統穩定性。

1240
執行緒池原理圖.png

執行緒池模型

這裡使用建立Thread()例項來實現,下面會再用繼承threading.Thread()的類來實現

# 建立佇列例項, 用於儲存任務
queue = Queue()

# 定義需要執行緒池執行的任務
def do_job():
    while True:
        i = queue.get()
        time.sleep(1)
        print 'index %s, curent: %s' % (i, threading.current_thread())
        queue.task_done()

if __name__ == '__main__':
    # 建立包括3個執行緒的執行緒池
    for i in range(3):
        t = Thread(target=do_job)
        t.daemon=True # 設定執行緒daemon  主執行緒退出,daemon執行緒也會推出,即時正在執行
        t.start()

    # 模擬建立執行緒池3秒後塞進10個任務到佇列
    time.sleep(3)
    for i in range(10):
        queue.put(i)

    queue.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

輸出結果

index 1, curent: <Thread(Thread-2, started daemon 139652180764416)>
index 0, curent: <Thread(Thread-1, started daemon 139652189157120)>
index 2, curent: <Thread(Thread-3, started daemon 139652172371712)>
index 4, curent: <Thread(Thread-1, started daemon 139652189157120)>
index 3, curent: <Thread(Thread-2, started daemon 139652180764416)>
index 5, curent: <Thread(Thread-3, started daemon 139652172371712)>
index 6, curent: <Thread(Thread-1, started daemon 139652189157120)>
index 7, curent: <Thread(Thread-2, started daemon 139652180764416)>
index 8, curent: <Thread(Thread-3, started daemon 139652172371712)>
index 9, curent: <Thread(Thread-1, started daemon 139652189157120)>
finish
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

可以看到所有任務都是在這幾個執行緒中完成Thread-(1-3)

執行緒池原理

執行緒池基本原理: 我們把任務放進佇列中去,然後開N個執行緒,每個執行緒都去佇列中取一個任務,執行完了之後告訴系統說我執行完了,然後接著去佇列中取下一個任務,直至佇列中所有任務取空,退出執行緒。

上面這個例子生成一個有3個執行緒的執行緒池,每個執行緒都無限迴圈阻塞讀取Queue佇列的任務所有任務都只會讓這3個預生成的執行緒來處理。

具體工作描述如下:

  1. 建立Queue.Queue()例項,然後對它填充資料或任務
  2. 生成守護執行緒池,把執行緒設定成了daemon守護執行緒
  3. 每個執行緒無限迴圈阻塞讀取queue佇列的專案item,並處理
  4. 每次完成一次工作後,使用queue.task_done()函式向任務已經完成的佇列傳送一個訊號
  5. 主執行緒設定queue.join()阻塞,直到任務佇列已經清空了,解除阻塞,向下執行

這個模式下有幾個注意的點:

  • 將執行緒池的執行緒設定成daemon守護程序,意味著主執行緒退出時,守護執行緒也會自動退出,如果使用預設
    daemon=False的話, 非daemon的執行緒會阻塞主執行緒的退出,所以即使queue佇列的任務已經完成
    執行緒池依然阻塞無限迴圈等待任務,使得主執行緒也不會退出。

  • 當主執行緒使用了queue.join()的時候,說明主執行緒會阻塞直到queue已經是清空的,而主執行緒怎麼知道queue已經是清空的呢?就是通過每次執行緒queue.get()後並處理任務後,傳送queue.task_done()訊號,queue的資料就會減1,直到queue的資料是空的,queue.join()解除阻塞,向下執行。

  • 這個模式主要是以佇列queue的任務來做主導的,做完任務就退出,由於執行緒池是daemon的,所以主退出執行緒池所有執行緒都會退出。 有別於我們平時可能以佇列主導thread.join()阻塞,這種執行緒完成之前阻塞主執行緒。看需求使用哪個join():

    如果是想做完一定數量任務的佇列就結束,使用queue.join(),比如爬取指定數量的網頁
    如果是想執行緒做完任務就結束,使用thread.join()
    • 1
    • 2

示例:使用執行緒池寫web伺服器

import socket
import threading
from threading import Thread
import threading
import sys
import time
import random
from Queue import Queue

host = ''
port = 8888
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(3)

class ThreadPoolManger():
    """執行緒池管理器"""
    def __init__(self, thread_num):
        # 初始化引數
        self.work_queue = Queue()
        self.thread_num = thread_num
        self.__init_threading_pool(self.thread_num)

    def __init_threading_pool(self, thread_num):
        # 初始化執行緒池,建立指定數量的執行緒池
        for i in range(thread_num):
            thread = ThreadManger(self.work_queue)
            thread.start()

    def add_job(self, func, *args):
        # 將任務放入佇列,等待執行緒池阻塞讀取,引數是被執行的函式和函式的引數
        self.work_queue.put((func, args))

class ThreadManger(Thread):
    """定義執行緒類,繼承threading.Thread"""
    def __init__(self, work_queue):
        Thread.__init__(self)
        self.work_queue = work_queue
        self.daemon = True

    def run(self):
        # 啟動執行緒
        while True:
            target, args = self.work_queue.get()
            target(*args)
            self.work_queue.task_done()

# 建立一個有4個執行緒的執行緒池
thread_pool = ThreadPoolManger(4)

# 處理http請求,這裡簡單返回200 hello world
def handle_request(conn_socket):
    recv_data = conn_socket.recv(1024)
    reply = 'HTTP/1.1 200 OK \r\n\r\n'
    reply += 'hello world'
    print 'thread %s is running ' % threading.current_thread().name
    conn_socket.send(reply)
    conn_socket.close()

# 迴圈等待接收客戶端請求
while True:
    # 阻塞等待請求
    conn_socket, addr = s.accept()
    # 一旦有請求了,把socket扔到我們指定處理函式handle_request處理,等待執行緒池分配執行緒處理
    thread_pool.add_job(handle_request, *(conn_socket, ))

s.close()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
# 執行程序
[master][/data/web/advance_python/socket]$ python sock_s_threading_pool.py 

# 檢視執行緒池狀況
[master][/data/web/advance_python/socket]$ ps -eLf|grep sock_s_threading_pool
lisa+ 27488 23705 27488  0    5 23:22 pts/30   00:00:00 python sock_s_threading_pool.py
lisa+ 27488 23705 27489  0    5 23:22 pts/30   00:00:00 python sock_s_threading_pool.py
lisa+ 27488 23705 27490  0    5 23:22 pts/30   00:00:00 python sock_s_threading_pool.py
lisa+ 27488 23705 27491  0    5 23:22 pts/30   00:00:00 python sock_s_threading_pool.py
lisa+ 27488 23705 27492  0    5 23:22 pts/30   00:00:00 python sock_s_threading_pool.py

# 跟我們預期一樣一共有5個執行緒,一個主執行緒,4個執行緒池執行緒
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

這個執行緒池web伺服器編寫框架包括下面幾個組成部分及步驟:

  • 定義執行緒池管理器ThreadPoolManger,用於建立並管理執行緒池,提供add_job()介面,給執行緒池加任務
  • 定義工作執行緒ThreadManger, 定義run()方法,負責無限迴圈工作佇列,並完成佇列任務
  • 定義socket監聽請求s.accept() 和處理請求 handle_requests() 任務。
  • 初始化一個4個執行緒的執行緒池,都阻塞等待這讀取佇列queue的任務
  • 當socket.accept()有請求,則把conn_socket做為引數,handle_request方法,丟給執行緒池,等待執行緒池分配執行緒處理

GIL 對多執行緒的影響

因為Python的執行緒雖然是真正的執行緒,但直譯器執行程式碼時,有一個GIL鎖:Global Interpreter Lock,任何Python執行緒執行前,必須先獲得GIL鎖,然後,每執行100條位元組碼,直譯器就自動釋放GIL鎖,讓別的執行緒有機會執行。這個GIL全域性鎖實際上把所有執行緒的執行程式碼都給上了鎖,所以,多執行緒在Python中只能交替執行,即使100個執行緒跑在100核CPU上,也只能用到1個核。

但是對於IO密集型的任務,多執行緒還是起到很大效率提升,這是協同式多工
當一項任務比如網路 I/O啟動,而在長的或不確定的時間,沒有執行任何 Python 程式碼的需要,一個執行緒便會讓出GIL,從而其他執行緒可以獲取 GIL 而執行 Python。這種禮貌行為稱為協同式多工處理,它允許併發;多個執行緒同時等待不同事件。

兩個執行緒在同一時刻只能有一個執行 Python ,但一旦執行緒開始連線,它就會放棄 GIL ,這樣其他執行緒就可以執行。這意味著兩個執行緒可以併發等待套接字連線,這是一件好事。在同樣的時間內它們可以做更多的工作。

執行緒池要設定為多少?

伺服器CPU核數有限,能夠同時併發的執行緒數有限,並不是開得越多越好,以及執行緒切換是有開銷的,如果執行緒切換過於頻繁,反而會使效能降低

執行緒執行過程中,計算時間分為兩部分:

  • CPU計算,佔用CPU
  • 不需要CPU計算,不佔用CPU,等待IO返回,比如recv(), accept(), sleep()等操作,具體操作就是比如
    訪問cache、RPC呼叫下游service、訪問DB,等需要網路呼叫的操作

那麼如果計算時間佔50%, 等待時間50%,那麼為了利用率達到最高,可以開2個執行緒:
假如工作時間是2秒, CPU計算完1秒後,執行緒等待IO的時候需要1秒,此時CPU空閒了,這時就可以切換到另外一個執行緒,讓CPU工作1秒後,執行緒等待IO需要1秒,此時CPU又可以切回去,第一個執行緒這時剛好完成了1秒的IO等待,可以讓CPU繼續工作,就這樣迴圈的在兩個執行緒之前切換操作。

那麼如果計算時間佔20%, 等待時間80%,那麼為了利用率達到最高,可以開5個執行緒:
可以想象成完成任務需要5秒,CPU佔用1秒,等待時間4秒,CPU線上程等待時,可以同時再啟用4個執行緒,這樣就把CPU和IO等待時間,最大化的重疊起來

抽象一下,計算執行緒數設定的公式就是:
N核伺服器,通過執行業務的單執行緒分析出本地計算時間為x,等待時間為y,則工作執行緒數(執行緒池執行緒數)設定為 N*(x+y)/x,能讓CPU的利用率最大化。
由於有GIL的影響,python只能使用到1個核,所以這裡設定N=1