1. 程式人生 > >Python 多執行緒、多程序 (二)之 多執行緒、同步、通訊

Python 多執行緒、多程序 (二)之 多執行緒、同步、通訊

Python 多執行緒、多程序 (一)之 原始碼執行流程、GIL
Python 多執行緒、多程序 (二)之 多執行緒、同步、通訊
Python 多執行緒、多程序 (三)之 執行緒程序對比、多執行緒

一、python多執行緒

對於I/O操作的時候,程序與執行緒的效能差別不大,甚至由於執行緒更輕量級,效能更高。這裡的I/O包括網路I/O和檔案I/O

1、例項

假如利用socket傳送http請求,也就是網路I/O。爬取列表網頁中的寫href連結,然後獲取href連結之後,在爬去連結的網頁詳情。
如果不適用多執行緒的話,程式序列的執行,結果就是要先等待列表網頁獲取所有的href的連結之後,才可以逐個的爬去href連結所指的網頁詳情,這就使得等待時間很長。
如果使用多執行緒程式設計,執行緒A執行第一個列表網頁程式,遇到I/O操作,GIL釋放,當獲取到第一個href連結之後,執行緒B就自動的去獲取href連結所指的網頁詳情。

2、多執行緒實現

使用sleep模擬網路I/IO

# test3.py

import time
import threading

def get_detail_html(url):
    print("get detail html started")
    time.sleep(2)
    print("get detail html end")

def get_detail_url(url):
    print("get detail url started")
    time.sleep(4)
    print("get detail url end")

if  __name__ == "__main__":

    # 函式方法 arg 為函式引數
    thread1 = threading.Thread(target=get_detail_html, args=("",))
    thread2 = threading.Thread(target=get_detail_url, args=("",))

    start_time = time.time()

    # 子執行緒1,2開始
    thread1.start()  
    thread2.start()

    print ("last time: {}".format(time.time()-start_time))

# 執行結果
get detail html started
get detail url started
last time: 0.0019958019256591797  # 忽略為0
get detail html end
get detail url end

按照上面執行緒並行執行的邏輯應該是列印時間為2秒,但是結果卻為0。
任何程序預設就會啟動一個執行緒,該執行緒稱為主執行緒,主執行緒又可以啟動新的執行緒。上面的thread1與thread2就是主執行緒啟動的兩個新的執行緒,那麼在兩個子執行緒啟動之後,主執行緒中其餘的程式段print函式也在並行執行,所以時間為0。當兩個子執行緒執行完畢之後,主執行緒退出,程序關閉,程式執行結束。才會打印出get detail html end,get detail url end。

3、守護執行緒

那麼如何使得主執行緒退出的時候子執行緒也退出。或者說,主執行緒推出的時候kill掉子執行緒?

<1>、將子執行緒設定成守護執行緒

# test4.py

import time
import threading

def get_detail_html(url):
    print("get detail html started")
    time.sleep(2)
    print("get detail html end")


def get_detail_url(url):
    print("get detail url started")
    time.sleep(4)
    print("get detail url end")

if  __name__ == "__main__":
    # 函式方法 arg 為函式引數
    thread1 = threading.Thread(target=get_detail_html, args=("",))
    thread2 = threading.Thread(target=get_detail_url, args=("",))

    thread1.setDaemon(True)
    thread2.setDaemon(True)
    # 將兩個執行緒設定為守護執行緒,即主執行緒退出,這兩個子執行緒也退出,kill

    start_time = time.time()
     # 子程開始
    thread1.start() 
    thread2.start()

    #當主執行緒退出的時候, 子執行緒kill掉
    print ("last time: {}".format(time.time()-start_time))

# 輸出
get detail html started
get detail url started
last time: 0.0

將兩個執行緒設定為守護執行緒,即主執行緒退出,這兩個守護執行緒也退出。列印結果中執行到print之後直接程式結束。

由於兩個執行緒的時間不相同,那麼兩者有什麼區別呢

<2>、先將thread1設定為守護執行緒

# test5.py

import time
import threading

def get_detail_html(url):
    print("get detail html started")
    time.sleep(2)
    print("get detail html end")


def get_detail_url(url):
    print("get detail url started")
    time.sleep(4)
    print("get detail url end")


if  __name__ == "__main__":
    # 函式方法 arg 為函式引數
    thread1 = threading.Thread(target=get_detail_html, args=("",))
    thread2 = threading.Thread(target=get_detail_url, args=("",))

    thread1.setDaemon(True)  # 只將thread設定為守護執行緒
    # thread2.setDaemon(True)

    start_time = time.time()

    thread1.start()  
    thread2.start()

    #當主執行緒退出的時候, 子執行緒kill掉
    print ("last time: {}".format(time.time()-start_time))

# 結果 
get detail html started
get detail url started
last time: 0.000997781753540039
get detail html end
get detail url end

只將thread1設定為守護執行緒之後,由於thread2的sleep時間為4秒,所以主執行緒仍會等待thread2執行結束之後才退出,而thread1由於時間為2秒,所以也會列印。

<3>、先將thread2設定為守護執行緒

# test6.py

import time
import threading

def get_detail_html(url):
    print("get detail html started")
    time.sleep(2)
    print("get detail html end")


def get_detail_url(url):
    print("get detail url started")
    time.sleep(4)
    print("get detail url end")


if  __name__ == "__main__":

    # 函式方法 arg 為函式引數
    thread1 = threading.Thread(target=get_detail_html, args=("",))
    thread2 = threading.Thread(target=get_detail_url, args=("",))

    # thread1.setDaemon(True)
    thread2.setDaemon(True)

    start_time = time.time()

    thread1.start()  
    thread2.start()

    print ("last time: {}".format(time.time()-start_time))

# 輸出
get detail html started
get detail url started
last time: 0.0029969215393066406
get detail html end

由於只將thread2設定為守護執行緒,print函式執行結束的時候會首先kill掉thread2執行緒。但是由於thread1執行緒還未結束,程式仍會等待兩秒輸出get detail html end才結束。

4、執行緒阻塞

上面說了如何在主執行緒結束的時候,直接kill掉子執行緒。那麼如何使子執行緒執行結束才執行主執行緒,就是阻塞主程序。

<1>、結束兩個子執行緒

# test7.py

import time
import threading

def get_detail_html(url):
    print("get detail html started")
    time.sleep(2)
    print("get detail html end")

def get_detail_url(url):
    print("get detail url started")
    time.sleep(4)
    print("get detail url end")

if  __name__ == "__main__":

    # 函式方法 arg 為函式引數
    thread1 = threading.Thread(target=get_detail_html, args=("",))
    thread2 = threading.Thread(target=get_detail_url, args=("",))

    start_time = time.time()

    # 子執行緒開始
    thread1.start()
    thread2.start()

    # 子執行緒程結束
    thread1.join()
    thread2.join()

    #當主執行緒退出的時候, 子執行緒kill掉
    print ("last time: {}".format(time.time()-start_time))

#輸出
get detail html started
get detail url started
get detail html end
get detail url end
last time: 4.001712799072266

由於呼叫了兩個thread的join方法,主執行緒阻塞,當子執行緒結束之後,print函式執行後主執行緒退出,程式結束。

<2>、結束thread1執行緒

# test8.py

import time
import threading

def get_detail_html(url):
    print("get detail html started")
    time.sleep(2)
    print("get detail html end")


def get_detail_url(url):
    print("get detail url started")
    time.sleep(4)
    print("get detail url end")

if  __name__ == "__main__":

    # 函式方法 arg 為函式引數
    thread1 = threading.Thread(target=get_detail_html, args=("",))
    thread2 = threading.Thread(target=get_detail_url, args=("",))

    start_time = time.time()

    # 子執行緒開始
    thread1.start()
    thread2.start()

    # 1執行緒程結束
    thread1.join()
    # thread2.join()

    #當主執行緒退出的時候, 子執行緒kill掉
    print ("last time: {}".format(time.time()-start_time))

# 輸出
get detail html started
get detail url started
get detail html end
last time: 2.001251220703125
get detail url end

由於呼叫了thread1的join方法,阻塞主執行緒,thread1直接結束之後print列印時間,但是對另一個執行緒沒有影響。所以在列印last time: 2.001251220703125時間,等待兩秒列印get detail url end,主執行緒才會退出。

<3>、結束thread2執行緒

# test9.py

import time
import threading

def get_detail_html(url):
    print("get detail html started")
    time.sleep(2)
    print("get detail html end")


def get_detail_url(url):
    print("get detail url started")
    time.sleep(4)
    print("get detail url end")

if  __name__ == "__main__":

    # 函式方法 arg 為函式引數
    thread1 = threading.Thread(target=get_detail_html, args=("",))
    thread2 = threading.Thread(target=get_detail_url, args=("",))

    start_time = time.time()

    # 子執行緒開始
    thread1.start()
    thread2.start()

    # 2執行緒程結束
    # thread1.join()
    thread2.join()

    #當主執行緒退出的時候, 子執行緒kill掉
    print ("last time: {}".format(time.time()-start_time))

# 輸出
get detail html started
get detail url started
get detail html end
get detail url end
last time: 4.002287864685059

由於thread2執行緒的sleep的時間為4秒,期間thread1已經執行完畢,所以列印時間為4秒。

5、Thread類繼承式建立

同樣的也可以使用類繼承的方法建立執行緒例項,效果一樣的

# test10.py

import time
import threading

class GetDetailHtml(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")


class GetDetailUrl(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")

if  __name__ == "__main__":

    # 類繼承方法
    thread1 = GetDetailHtml("get_detail_html")
    thread2 = GetDetailUrl("get_detail_url")
    start_time = time.time()

    # 子執行緒開始
    thread1.start()
    thread2.start()

    # 子執行緒程結束
    thread1.join()
    thread2.join()

    #當主執行緒退出的時候, 子執行緒kill掉
    print ("last time: {}".format(time.time()-start_time))

二、執行緒通訊

1、共享變數通訊

共享變數通訊,是執行緒間通訊最簡單的方式,但也是最容易出問題的方式。以上面爬去頁面和網頁連結的例項進行擴充套件。在上面的例項中,因為要解決請求列表頁面的時候網路時延問題,引入了多執行緒並行,邊爬去列表頁獲取href,再爬取href指向的想起那個頁面,下面將爬去的頁面存入列表實現。

# test11.py

import threading
import time

detail_url_list = []  # 儲存著爬取下來的href連結

def get_detail_html(detail_url_list):  # 引數這裡作為對全域性變數的引用
    while True:
        # 使用while語句使得執行緒持續爬去
        if len(detail_url_list):
            url = detail_url_list.pop()
            print('get detail html start')
            time.sleep(2)
            print('get detail html end')


def get_detail_url(detail_url_list):
    while True:
        # 使用while語句使得執行緒持續爬取
        print('get detail url start')
        time.sleep(4)

        for i in range(20):
            detail_url_list.append('http://www.xxxx.com/{}.html'.format(i))
        print('get detail end')


if __name__ == "__main__":
    thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_list,))

    for i in range(10):
        # 為了模擬多個執行緒併發,這裡建立了十個子執行緒
        html_thread = threading.Thread(target=get_detail_html, args=(detail_url_list,))
        html_thread.start()

    start_time = time.time()

    print("last time: {}".format(time.time() - start_time))

但是上面問題也會很明顯,在GIL的示例中,total變數由於變數共享的緣故,沒有按照預期的執行。而在上面的爬蟲例項中,detail_url_list作為全域性共享變數,pop操作,append操作,多個執行緒共用資源,都不是執行緒安全的操作,會出現問題。所以就必須給變數加上鎖,保持安全性。為了擺脫這種問題,使用訊息佇列通訊

2、訊息佇列通訊

訊息佇列通訊也就是使用Queue這個類來表示變數,從而達到執行緒安全,由於Queue這個類內部封裝了deque,也就是python中的雙端佇列。雙端對列本身就是安全界別很高的一種型別,實現執行緒間的安全操作。

# test12.py

#通過queue的方式進行執行緒間同步
from queue import Queue
import time
import threading

def get_detail_html(queue):
    #爬取文章詳情頁
    while True:
        url = queue.get()
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")


def get_detail_url(queue):
    # 爬取文章列表頁
    while True:
        print("get detail url started")
        time.sleep(4)
        for i in range(20):
            queue.put("http://projectsedu.com/{id}".format(id=i))
        print("get detail url end")

if  __name__ == "__main__":
    detail_url_queue = Queue(maxsize=1000)

    thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
    for i in range(10):
        html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
        html_thread.start()

    start_time = time.time()

    # detail_url_queue.task_done()
    detail_url_queue.join()

    #當主執行緒退出的時候, 子執行緒kill掉
    print ("last time: {}".format(time.time()-start_time))

使用了訊息佇列替代共享變數

  • Queue([maxsize]):建立共享的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。
  • q.put方法用以插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為False,但該Queue已滿,會立即丟擲Queue.Full異常。沒有引數時,q.put的個數大於佇列數時,會一直阻塞住。
  • q.get方法可以從佇列讀取並且刪除一個元素。同樣,get方法有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果佇列為空,則立即丟擲Queue.Empty異常。沒有引數時,q.get的個數大於佇列數時,會一直阻塞住。
  • q.put_nowait()等價於q.put(block=False)佇列滿時再存也會拋異常
  • q.get_nowait()等價於q.get(block=False)佇列為空取不出時會拋異常
  • q.task_done():使用者使用此方法發出訊號,表示q.get()的返回專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除專案的數量,將引發ValueError異常
  • q.join():生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理。阻塞將持續到佇列中的每個專案均呼叫q.task_done()方法為止

三、執行緒同步

1、加鎖

在上面的第一個GIL示例中,由於GIL釋放的緣故,多個執行緒共享變數,導致total的值不像預期那樣為0的問題發生,也就是如何執行緒同步。最簡單的方式就是加鎖。加鎖使得一個執行緒在佔用資源的時候,別的執行緒都必須等待,只有當這個執行緒主動釋放資源的時候,其他執行緒才能使用資源,也就是資源佔用互斥。這樣就可要保證共享變數的安全性。

# test13.py

from threading import Lock

#在同一個執行緒裡面,可以連續呼叫多次acquire, 一定要注意acquire的次數要和release的次數相等
total = 0
lock = Lock()

def add():
    global lock
    global total
    for i in range(1000000):
        lock.acquire()   # 加鎖
        total += 1
        lock.release()  # 釋放鎖


def desc():
    global total
    global lock
    for i in range(1000000):
        lock.acquire()  # 加鎖
        total -= 1
        lock.release()  # 釋放鎖

import threading

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print(total)

# 輸出
在等待了一段時間後輸出0
0 # total的列印結果為0

加鎖的時候要保證加上鎖執行完成之後,就要釋放掉,不然會一直佔用資源。

加鎖的結果使得在執行total-=1或者total+=1的賦值語句的時候,該賦值語句對應的多條位元組碼指令執行完之後,才會其他程序執行修改total值。該執行緒佔用了鎖,所以其他執行緒不能修改total值,只有當該釋放了鎖,其他執行緒才能修改total值,不會造成修改共享變數的衝突。這是加鎖的好處,那麼代價也十分明顯
加鎖缺點:

  • 加鎖效能
  • 死鎖風險

補充:另外自己加的鎖使使用者級別的與GIL不同。

<1>、效能問題

本來的多執行緒,由於加鎖的緣故,首先是阻止了多執行緒併發執行,包含鎖的某段程式碼實際上只能以單執行緒模式執行。並且由於來回切換執行緒的緣故,程式效能變得低下
將test2.py改成如下

# test14.py

total = 0

def add():
    global total
    for i in range(1000000):
        total += 1
def desc():
    global total
    for i in range(1000000):
        total -= 1

import threading
import time
start_time = time.time()

add()
desc()

print(total)

print("last time: {}".format(time.time() - start_time))

# 輸出
0
last time: 0.314816951751709

這是簡單的單執行緒程式,持續時間為0.3秒。沒有使用thread多執行緒

下面使用threading多執行緒,並且加鎖

# test15.py

from threading import Lock  

total = 0
lock = Lock()

def add():
    global lock
    global total
    for i in range(1000000):
        lock.acquire()
        total += 1
        lock.release()


def desc():
    global total
    global lock
    for i in range(1000000):
        lock.acquire()
        total -= 1
        lock.release()

import threading
import time

start_time = time.time()

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print(total)
print("last time: {}".format(time.time() - start_time))

# 輸出
0
last time: 5.062084674835205

使用了多執行緒,為了保證共享變數的安全性操作,執行緒同步,加鎖導致類似單執行緒,程式的執行時間達到了5秒鐘。可見執行緒之間的切換十分浪費時間。所以說,CPython的GIL本意是用來保護所有全域性的直譯器和環境狀態變數的,如果去掉GIL,就需要更多的更細粒度的鎖對直譯器的眾多全域性狀態進行保護。做過測試將GIL去掉,加入更細粒度的鎖。但是實踐檢測對單執行緒來說,效能更低。

<2>、死鎖風險

來看下面例子
這裡為了在一個執行緒中多次呼叫lock,使用可重入的鎖Rlock物件
Lock與Rlock區別
RLock允許在同一執行緒中被多次acquire。而Lock卻不允許這種情況。注意:如果使用RLock,那麼acquire和release必須成對出現,即呼叫了n次acquire,必須呼叫n次的release才能真正釋放所佔用的瑣。

# test15.py

from threading import RLock  # 可重入的鎖

total = 0
lock = RLock()

def add():
    global lock
    global total
    for i in range(1000000):
        lock.acquire()
        lock.acquire()  # 這裡加了兩次鎖
        total += 1
        lock.release()


def desc():
    global total
    global lock
    for i in range(1000000):
        lock.acquire()
        total -= 1
        lock.release()

import threading
import time

start_time = time.time()

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print(total)
print("last time: {}".format(time.time() - start_time))

由於在add函式中加了兩次鎖lock.acquire(),結果就是執行緒永遠都不獲釋放掉共享變數。一直佔用資源,其他的執行緒請求資源沒有結果,多個執行緒掛起,既不能執行,也無法結束,一直處於等待狀態,造成死鎖,只能靠作業系統強制終止。最終程式也沒有任何結果輸出。
所以在同一個執行緒裡面,可以連續呼叫多次acquire, 一定要注意acquire的次數要和release的次數相等

還有就是,執行緒的相互等待,假如記憶體中又兩中資源a和b,而執行緒A(a,b)和執行緒B(a,b)都申請資源。

第一步
執行緒A先申請a資源,執行緒B先申請b資源,因此沒有問題

第二步
由於a,b均已被A,B佔用,並且A申請b,B申請b,在位獲得新的資源的時候兩者都不會退出對現有資源的佔用,這就造成了兩個執行緒相互等待,並且這種等待會一直持續下去,造成死鎖。

2、執行緒複雜通訊

在上面看到執行緒進行通訊的時候需要加鎖,如果如何使用鎖進行執行緒的對話功能,例如

  • 執行緒A:hello,你好啊
  • 執行緒B:你好
  • 執行緒A:吃飯了嗎
  • 執行緒B:吃過了,你呢
  • 執行緒A:我也吃過了,咱們去搞PVM吧
  • 執行緒B:ok,走吧

<1>、簡單鎖

像上面的執行緒通訊,如果使用簡單的Rlock鎖

import threading


class ThreadA(threading.Thread):
    def __init__(self, lock):
        super().__init__(name="執行緒A")
        self.lock = lock

    def run(self):
        self.lock.acquire()
        print("{} : hello, 你好 ".format(self.name))
        self.lock.release()
        self.lock.acquire()
        print("{} : 吃過飯了嗎 ".format(self.name))
        self.lock.release()
        self.lock.acquire()
        print("{} : 我也吃過了,咱們去找PVM吧".format(self.name))
        self.lock.release()

class ThreadB(threading.Thread):
    def __init__(self, lock):
        super().__init__(name="執行緒B")
        self.lock = lock

    def run(self):
        self.lock.acquire()
        print("{} : 你好 ".format(self.name))
        self.lock.release()
        self.lock.acquire()
        print("{} : 吃過了,你呢".format(self.name))
        self.lock.release()
        self.lock.acquire()
        print("{} : ok,走吧 ".format(self.name))
        self.lock.release()

if __name__ == "__main__":

    lock = threading.RLock()

    a_thread = ThreadA(lock)
    b_thread = ThreadB(lock)

    a_thread.start()
    b_thread.start()

# 輸出
執行緒A : hello, 你好 
執行緒A : 吃過飯了嗎 
執行緒A : 我也吃過了,咱們去找PVM吧
執行緒B : 你好 
執行緒B : 吃過了,你呢
執行緒B : ok,走吧 

顯然沒有完成執行緒通訊的基本功能。

<2>、threading.Condition()

解決方案:線上程複雜通訊時使用threading.Condition(),可以把Condiftion理解為一把高階的瑣,它提供了比Lock, RLock更高階的功能,允許我們能夠控制複雜的執行緒同步問題。threadiong.Condition在內部維護一個瑣物件(預設是RLock),可以在建立Condigtion物件的時候把瑣物件作為引數傳入。Condition也提供了acquire, release方法,其含義與瑣的acquire, release方法一致,其實它只是簡單的呼叫內部瑣物件的對應的方法而已。Condition還提供wait方法、notify方法、notifyAll方法。這些方法只有在佔用瑣(acquire)之後才能呼叫,否則將會報RuntimeError異常。

方法介紹

  • acquire()/release():獲得/釋放 Lock
  • wait([timeout]):執行緒掛起,直到收到一個notify通知或者超時(可選的,浮點數,單位是秒s)才會被喚醒繼續執行。wait()必須在已獲得Lock前提下才能呼叫,否則會觸發RuntimeError。呼叫wait()會釋放Lock,直至該執行緒被Notify()、NotifyAll()或者超時執行緒又重新獲得Lock.
  • notify(n=1):通知其他執行緒,那些掛起的執行緒接到這個通知之後會開始執行,預設是通知一個正等待該condition的執行緒,最多則喚醒n個等待的執行緒。notify()必須在已獲得Lock前提下才能呼叫,否則會觸發RuntimeError。notify()不會主動釋放Lock。
  • notifyAll(): 如果wait狀態執行緒比較多,notifyAll的作用就是通知所有執行緒

原始碼分析

# 部分原始碼

_PyRLock = _RLock


class Condition:

    def __init__(self, lock=None):
        if lock is None:
            lock = RLock()
        self._lock = lock
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire
        self.release = lock.release

    def __enter__(self):
        return self._lock.__enter__()

    def __exit__(self, *args):
        return self._lock.__exit__(*args)

進入Condition這個類中檢視原始碼發現,在預設的情況下,Condition是封裝的鎖物件是Rlock,另外Condition類實現了__enter__,__exit__兩個特殊方法,由鴨子型別可知,說明可以像上下文管理器一樣使用它。
而在__enter__與__exit__兩個特殊方法中分別呼叫了self.acquire()與self.release()兩個方法,所以說不使用with上下文管理器的話也可以直接使用acquire()與release()兩個方法進行加鎖釋放鎖。

解決例項

class ThreadA(threading.Thread):
    def __init__(self, cond):
        super().__init__(name="執行緒A")
        self.cond = cond

    def run(self):
        with self.cond:
            print("{} : hello, 你好 ".format(self.name))  # 4
            self.cond.notify()  # 5
            self.cond.wait()  # 6

            print("{} : 吃過飯了嗎 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 我也吃過了,咱們去找PVM吧".format(self.name))
            self.cond.notify()
            self.cond.wait()


class ThreadB(threading.Thread):
    def __init__(self, cond):
        super().__init__(name="執行緒B")
        self.cond = cond

    def run(self):
        with self.cond:
            self.cond.wait()  # 2
            print("{} : 你好 ".format(self.name))  # 7
            self.cond.notify()

            self.cond.wait()
            print("{} : 吃過了,你呢".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : ok,走吧 ".format(self.name))
            self.cond.notify()


if __name__ == "__main__":

    cond = threading.Condition()

    b_thread = ThreadB(cond)
    a_thread = ThreadA(cond)

    b_thread.start()  # 1
    a_thread.start()  # 3

# 輸出結果

執行緒A : hello, 你好 
執行緒B : 你好 
執行緒A : 吃過飯了嗎 
執行緒B : 吃過了,你呢
執行緒A : 我也吃過了,咱們去找PVM吧
執行緒B : ok,走吧 

完成執行緒之間的複雜通訊。
這裡需要注意的是:兩個執行緒之間的開啟先後順序。b執行緒需要先於a執行緒開啟。原因:
1 先開啟b執行緒
2 wait方法會首先上一把鎖,執行緒處於阻塞態
3 開啟a執行緒
4 列印 執行緒A:hello,你好啊
5 這個時候cond物件呼叫notify方法,會釋放掉之前上的鎖
6 呼叫wait方法,為自己又上了一把鎖
7 由於notify方法已經打開了鎖,或繼續執行,列印 執行緒B:你好
其實wait方法會維持一個鎖,而這個鎖只有notify方法才能開啟。如果a執行緒先開啟,則是呼叫了wait方法維持了一把鎖,並沒有其他的執行緒會呼叫notify方法釋放這把鎖。則最終只會輸出 執行緒A : hello, 你好 ,而執行緒一直處於死鎖狀態。

補充:Condition物件會維持兩層鎖,而不是兩個鎖,更不是簡單的一個鎖。在開啟或者關閉上下文管理器物件的時候__enter__,__exit__方法會開啟釋放掉底層鎖(直接使用acquire()與release()兩個方法也行),這一層鎖是一個。而在持續連續呼叫的wait和notify方法則是對第二層鎖進行操作,而這一層所在Condition物件內部是封裝到一個雙端佇列中,在每次呼叫wait的時候分配一把鎖並放入到cond的等待佇列中,等到notify方法的喚醒。可以進入Condition原始碼檢視

3、Semaphore(訊號量)

同時只有n個執行緒可以獲得semaphore,即可以限制最大連線數為n),也就是執行緒最大併發量的控制。
Semaphore管理一個內建的計數器,每當呼叫acquire()時內建計數器-1;呼叫release() 時內建計數器+1;計數器不能小於0;當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()。
訊號量使得一個程式中有很多個執行緒,但是隻有n多個執行緒獲得訊號量,處於執行態

class HtmlSpider(threading.Thread):
    def __init__(self, url, sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print("got html text success, time is {}".format(time.ctime()))
        self.sem.release()

class UrlProducer(threading.Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem

    def run(self):
        for i in range(20):
            self.sem.acquire()
            html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
            html_thread.start()

if __name__ == "__main__":
    sem = threading.Semaphore(4)  # 每次只有4個執行緒獲取訊號量
    url_producer = UrlProducer(sem)
    url_producer.start()

在上面示例中,模擬爬蟲,建立20個子執行緒爬取html頁面,如果不是用訊號量,二十條資料一次返回。使用訊號量,使得每次只有4個執行緒執行。

# 輸出結果

got html text success, time is Tue Nov 20 17:17:55 2018
got html text success, time is Tue Nov 20 17:17:55 2018
got html text success, time is Tue Nov 20 17:17:55 2018
got html text success, time is Tue Nov 20 17:17:55 2018
got html text success, time is Tue Nov 20 17:17:57 2018
got html text success, time is Tue Nov 20 17:17:57 2018
got html text success, time is Tue Nov 20 17:17:57 2018
got html text success, time is Tue Nov 20 17:17:57 2018
got html text success, time is Tue Nov 20 17:17:59 2018
got html text success, time is Tue Nov 20 17:17:59 2018
got html text success, time is Tue Nov 20 17:17:59 2018
got html text success, time is Tue Nov 20 17:17:59 2018
got html text success, time is Tue Nov 20 17:18:01 2018
got html text success, time is Tue Nov 20 17:18:01 2018
got html text success, time is Tue Nov 20 17:18:01 2018
got html text success, time is Tue Nov 20 17:18:01 2018
got html text success, time is Tue Nov 20 17:18:03 2018
got html text success, time is Tue Nov 20 17:18:03 2018
got html text success, time is Tue Nov 20 17:18:03 2018
got html text success, time is Tue Nov 20 17:18:03 2018

每個兩秒列印一次結果,一次四條資料。總共二十個。

4、執行緒池

Python標準庫為我們提供了threading和multiprocessing模組編寫相應的多執行緒/多程序程式碼,但是當專案達到一定的規模,頻繁建立/銷燬程序或者執行緒是非常消耗資源的,這個時候我們就要編寫自己的執行緒池/程序池,以空間換時間。但從Python3.2開始,標準庫為我們提供了concurrent.futures模組,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫執行緒池/程序池提供了直接的支援。

concurrent.futures模組的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。但是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor卻是非常有用,顧名思義兩者分別被用來建立執行緒池和程序池的程式碼。我們可以將相應的tasks直接放入執行緒池/程序池,不需要維護Queue來操心死鎖的問題,執行緒池/程序池會自動幫我們排程。

Future你可以把它理解為一個在未來完成的操作,這是非同步程式設計的基礎,傳統程式設計模式下比如我們操作queue.get的時候,在等待返回結果之前會產生阻塞,cpu不能讓出來做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。

<1>、使用submit來操作執行緒池/程序池:

from concurrent.futures import ThreadPoolExecutor
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print('%r page is %d bytes' % (url, len(conn.read())))

executor = ThreadPoolExecutor(max_workers=3)

for url in URLS:
    future = executor.submit(load_url,url)
    print(future.done())

print('主執行緒')

# 執行結果:
False
False
False
主執行緒
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 75633 bytes
'http://www.163.com' page is 703974 bytes

根據執行結果,使用submit方法來往執行緒池中加入一個task,submit返回一個Future物件,對於Future物件可以簡單地理解為一個在未來完成的操作。由於執行緒池非同步提交了任務,主執行緒並不會等待執行緒池裡建立的執行緒執行完畢,所以執行了print('主執行緒'),相應的執行緒池中建立的執行緒並沒有執行完畢,故future.done()返回結果為False。

<2>、 用map來操作執行緒池/程序池:

from concurrent.futures import ThreadPoolExecutor
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print('%r page is %d bytes' % (url, len(conn.read())))

executor = ThreadPoolExecutor(max_workers=3)

executor.map(load_url,URLS)

print('主執行緒')
# 結果
主執行緒
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 75633 bytes
'http://www.163.com' page is 703974 bytes

從執行結果可以看出,map是按照URLS列表元素的順序返回的,並且寫出的程式碼更加簡潔直觀,可以根據具體的需求任選一種。

<3>、wait

wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個引數FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,預設設定為ALL_COMPLETED

如果採用預設的ALL_COMPLETED,程式會阻塞直到執行緒池裡面的所有任務都完成,再執行主執行緒:

from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print('%r page is %d bytes' % (url, len(conn.read())))

executor = ThreadPoolExecutor(max_workers=3)

f_list = []
for url in URLS:
    future = executor.submit(load_url,url)
    f_list.append(future)
print(wait(f_list))

print('主執行緒')
# 輸出
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 75627 bytes
'http://www.163.com' page is 703988 bytes
DoneAndNotDoneFutures(done={<Future at 0x2ab6ea89d30 state=finished returned NoneType>, <Future at 0x2ab6ea89240 state=finished returned NoneType>, <Future at 0x2ab6e93f7b8 state=finished returned NoneType>}, not_done=set())
主執行緒

如果採用FIRST_COMPLETED引數,程式並不會等到執行緒池裡面所有的任務都完成:

from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print('%r page is %d bytes' % (url, len(conn.read())))

executor = ThreadPoolExecutor(max_workers=3)

f_list = []
for url in URLS:
    future = executor.submit(load_url,url)
    f_list.append(future)
print(wait(f_list,return_when='FIRST_COMPLETED'))

print('主執行緒')
# 輸出
'https://www.baidu.com/' page is 227 bytes
DoneAndNotDoneFutures(done={<Future at 0x2cd5581a240 state=finished returned NoneType>}, not_done={<Future at 0x2cd5581ad30 state=running>, <Future at 0x2cd556cf7f0 state=running>})
主執行緒
'http://www.163.com' page is 703991 bytes
'https://github.com/' page is 75625 bytes

<4>、回撥函式

import requests
import time
from concurrent.futures import ThreadPoolExecutor


def get(url):
    print('GET {}'.format(url))
    response = requests.get(url)
    time.sleep(2)
    if response.status_code == 200:  # 200代表狀態:下載成功了
        return {'url': url, 'content': response.text}


def parse(res):
    print('%s parse res is %s' % (res['url'], len(res['content'])))
    return '%s parse res is %s' % (res['url'], len(res['content']))


def save(res):
    print('save', res)


def task(res):
    res = res.result()
    par_res = parse(res)
    save(par_res)


if __name__ == '__main__':
    urls = [
        'http://www.cnblogs.com',
        'https://www.python.org',
        'https://www.openstack.org',
    ]

    pool = ThreadPoolExecutor(2)
    for i in urls:
        pool.submit(get, i).add_done_callback(task)
        '''
        這裡的回撥函式拿到的是一個物件。得
        先把返回的res得到一個結果。即在前面加上一個res.result() 
        誰好了誰去掉回撥函式
        回撥函式也是一種程式設計思想。不僅線上程池用,在程序池也用
        '''
    pool.shutdown()  # 相當於程序池裡的close和join

# 輸出
GET http://www.cnblogs.com
GET https://www.python.org
https://www.python.org parse res is 50114
save https://www.python.org parse res is 50114
GET https://www.openstack.org
https://www.openstack.org parse res is 63253
save https://www.openstack.org parse res is 63253
http://www.cnblogs.com parse res is 40382
save http://www.cnblogs.com parse res is 40382