1. 程式人生 > >Python 實用程式設計技巧(多執行緒篇)

Python 實用程式設計技巧(多執行緒篇)

一、GIL(global_interpreter_lock)

1.概念:

Python 一開始為了簡單,在多執行緒程式設計的時候會在我們的直譯器上加一個非常大的鎖,也就是允許我們一次只有一個執行緒執行在一個CPU上,gil 就能實現在同一時刻只有一個執行緒在CPU上執行位元組碼(目的當然是保證執行緒安全),當然他的效能也是非常讓人詬病,因為他也無法將多個執行緒對映到多個CPU上(體現不出多核CPU的優勢)

解釋: python 在執行程式之前會將其先編譯成位元組碼,我們可以使用 dis.dis() 這個函式對某個函式進行反編譯

示例程式碼:

import dis
def a():
    a = 1
    a +=1
    return a

print dis.dis(a)

結果:

  4           0 LOAD_CONST               1 (1)
              3 STORE_FAST               0 (a)

  5           6 LOAD_FAST                0 (a)
              9 LOAD_CONST               1 (1)
             12 INPLACE_ADD         
             13 STORE_FAST               0 (a)

  6          16 LOAD_FAST                0 (a)
             19 RETURN_VALUE        
None

2.GPL 真的那麼安全嗎:

實際上在程執行過程中GPL 是會在一定時候釋放的,並不是一個執行完了才會執行另一個程式

示例程式碼:

import threading

total = 0
def add():
    global total
    for i in xrange(100000):
         total += 1

def desc():
    global total
    for i in xrange(100000):
        total -= 1

thread1 = threading.Thread(target = add)
thread2 = threading.Thread(target = desc)

thread1.start()
thread2.start()
thread1.join()
thread2.join()

print total

上面這個程式你每執行一次得到而結果都不一樣,說明並不是執行完一個再執行另一個,也就是說GIL會在中間釋放,交給另一個執行緒。

那麼一般在什麼情況下釋放?

一般是在執行一定行數的位元組碼或者執行一定時間,或者遇到I/O操作(因為I/O操作的時間非常長,程式不會等待)

二、多執行緒程式設計

方法一:通過Thread類例項化

我們可以以一個爬蟲的例子來理解,我們可以用一個執行緒來爬去某網站的列表頁,另一個執行緒根據這個列表來爬去詳情頁,兩者同時進行效率很高(為什麼能同時進行呢?因為我們的socket程式設計也是屬於網路I/O程式設計的一種)

示例程式碼:

import time
import threading

def show_html_detail(url):
    print "get html detail start\n"
    time.sleep(2)
    print "get html detail end\n"

def show_url_detail(url):
    print "get url detail start\n"
    time.sleep(2)
    print "get url detail end\n"

if __name__ == '__main__':

    thread1 = threading.Thread(target=show_url_detail,args=("",))
    thread2 = threading.Thread(target=show_html_detail,args=("",))
    start_time = time.time()
    thread1.start()
    thread2.start()
    print "last time {}\n".format(time.time()-start_time)

結果:

get url detail start

get html detail start
last time 0.0


get html detail end
get url detail end

解釋:

為什麼是0s 呢?如果是並行的應該是兩秒,其實是因為我們這裡有三個執行緒,其中有一個是主執行緒,這裡是主執行緒和子執行緒同時執行,,主執行緒執行結束以後就列印了這句話,子執行緒繼續執行,輸出了後面的 end

需求一:

那麼我們其實希望主執行緒執行完(和退出不一樣)後子執行緒也一起結束,我們可以設定子執行緒為守護執行緒,即thread.steDaemon(True),守護執行緒會在主執行緒結束後跟隨主執行緒結束(主執行緒結束:主執行緒所在的程序中的所有非守護執行緒都結束)

示例程式碼:

import time
import threading

def show_html_detail(url):
    print "get html detail start\n"
    time.sleep(2)
    print "get html detail end\n"

def show_url_detail(url):
    print "get url detail start\n"
    time.sleep(2)
    print "get url detail end\n"

if __name__ == '__main__':

    thread1 = threading.Thread(target=show_url_detail,args=("",))
    thread2 = threading.Thread(target=show_html_detail,args=("",))
    start_time = time.time()
    thread1.setDaemon(True)
    thread2.setDaemon(True)
    thread1.start()
    thread2.start()
    print "last time {}\n".format(time.time()-start_time)

結果:

get url detail start

get html detail start

last time 0.0

可見,end 並沒有輸出來,說明子執行緒在主執行緒結束時被強制結束了

我們嘗試關閉一個守護執行緒並延長另一個守護執行緒的時間再次驗證一下

示例程式碼:

import time
import threading

def show_html_detail(url):
    print "get html detail start\n"
    time.sleep(2)
    print "get html detail end\n"

def show_url_detail(url):
    print "get url detail start\n"
    time.sleep(4)                       #注意這裡對守護執行緒的時間進行了修改
    print "get url detail end\n"

if __name__ == '__main__':

    thread1 = threading.Thread(target=show_url_detail,args=("",))
    thread2 = threading.Thread(target=show_html_detail,args=("",))
    start_time = time.time()
    thread1.setDaemon(True)
    thread1.start()
    thread2.start()
    print "last time {}\n".format(time.time()-start_time)

結果:

get url detail start

get html detail start
last time 0.00100016593933

get html detail end

可以看到守護執行緒並沒有輸出,我們的分析是正確的

需求二:

我們其實希望我們子執行緒和主執行緒不要併發執行,主執行緒等等子執行緒執行結束再執行,於是Python也給我們提供的對應的方法,join() 主執行緒阻塞方法,使用以後主執行緒就會被阻塞等待子執行緒執行

示例程式碼:

import time
import threading

def show_html_detail(url):
    print "get html detail start\n"
    time.sleep(2)
    print "get html detail end\n"

def show_url_detail(url):
    print "get url detail start\n"
    time.sleep(2)
    print "get url detail end\n"

if __name__ == '__main__':

    thread1 = threading.Thread(target=show_url_detail,args=("",))
    thread2 = threading.Thread(target=show_html_detail,args=("",))
    start_time = time.time()
    #thread1.setDaemon(True)
    #thread2.setDaemon(True)
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print "last time {}\n".format(time.time()-start_time)

結果:

get url detail start

get html detail start

get url detail end

get html detail end

last time 2.00100016594

解釋:

可以看到我們的確在2s 左右執行結束了兩個執行緒,實現了併發,(如果我們將其中一個執行緒的時間設定為4那麼我們最後的時間將會是4s左右,也就是多個執行緒的最常時間)

方法二:通過繼承Thread類實現多執行緒

import time
import threading

class Get_html_detail(threading.Thread):
    def __init__(self,name):
        super(Get_html_detail,self).__init__(name = name)

    def run(self):
        print "get html detail start\n"
        time.sleep(2)
        print "get html detail end\n"

class Get_url_dtail(threading.Thread):
    def __init__(self, name):
        super(Get_url_dtail,self).__init__(name = name)

    def run(self):
        print "get url detail start\n"
        time.sleep(2)
        print "get url detail end\n"


if __name__ == '__main__':

    thread1 = Get_url_dtail("show_url_detail")
    thread2 = Get_html_detail("show_html_detail")
    start_time = time.time()
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print "last time {}\n".format(time.time()-start_time)

三、執行緒間通訊

為什麼需要執行緒間的通訊呢?

我們上面說過,在爬蟲的爬取URL執行緒爬取成功以後,我們需要將爬取到的執行緒丟給另一個爬取文章詳情頁的執行緒,這時候就需要使用到 Python的執行緒間通訊

方法一:共享變數

設定一個全域性變數實現多個執行緒之間的通訊

import time
import threading

detail_url_list = []

def show_html_detail():
    global detail_url_list
    url = detail_url_list.pop() # 注意這裡沒有在函式中寫迴圈,而是選擇在主函式中國實現迴圈多執行緒提高效率
    print "get html detail start\n"
    time.sleep(2)
    print "get html detail end\n"

def show_url_detail():
    global detail_url_list
    print "get url detail start\n"
    time.sleep(2)
    for i in xrange(20):
        detail_url_list.append("http://www.threadtest.com/{id}".format(id = i))
    print "get url detail end\n"

if __name__ == '__main__':
    thread1 = threading.Thread(target=show_url_detail)
    thread1.start()
    thread1.join()
    for i in xrange(20):
        thread2 = threading.Thread(target=show_html_detail)
        thread2.start()
        thread2.join()

實際上可以直接使用傳參的方式:

import time
import threading

detail_url_list = []

def show_html_detail(detail_url_list):
    url = detail_url_list.pop()
    print "get html detail start\n"
    time.sleep(2)
    print "get html detail end\n"

def show_url_detail(detail_url_list):
    print "get url detail start\n"
    time.sleep(2)
    for i in xrange(20):
        detail_url_list.append("http://www.threadtest.com/{id}".format(id = i))
    print "get url detail end\n"

if __name__ == '__main__':

    thread1 = threading.Thread(target=show_url_detail,args=(detail_url_list,))
    start_time = time.time()
    thread1.start()
    thread1.join()
    for i in xrange(20):
        thread2 = threading.Thread(target=show_html_detail,args=(detail_url_list,))
        thread2.start()
        thread2.join()
    print "last time {}\n".format(time.time() - start_time)

但是使用Pop 方法其實並不是執行緒安全的,我們還是需要在上面加一把鎖

方法二:執行緒佇列

示例程式碼:

import Queue
import time
import threading


def show_html_detail(queue):
    url = queue.get()#這是一個阻塞的操作,當佇列為空的時候會阻塞,所以我們不用 join
    print "get html detail start\n"
    time.sleep(2)
    print "get html detail end\n"


def show_url_detail(queue):
    print "get url detail start\n"
    time.sleep(2)
    for i in xrange(20):
        queue.put("http://www.threadtest.com/{id}".format(id=i))
    print "get url detail end\n"


if __name__ == '__main__':
    detail_url_queue = Queue(maxsize=1000)
    thread1 = threading.Thread(target=show_url_detail, args=(detail_url_queue,))
    start_time = time.time()
    thread1.start()
    for i in xrange(20):
        thread2 = threading.Thread(target=show_html_detail, args=(detail_url_queue,))
        thread2.start()
    print "last time {}\n".format(time.time() - start_time)

四、執行緒同步

1.為什麼要執行緒同步?

我們用之前的一個例子給大家分析一下

示例程式碼:

import dis

def add(a):
    a += 1

def desc(a):
    a -= 1

print dis.dis(add)
print dis.dis(desc)

結果:

  5           0 LOAD_FAST                0 (a)
              3 LOAD_CONST               1 (1)
              6 INPLACE_ADD         
              7 STORE_FAST               0 (a)
             10 LOAD_CONST               0 (None)
             13 RETURN_VALUE        
None
  8           0 LOAD_FAST                0 (a)
              3 LOAD_CONST               1 (1)
              6 INPLACE_SUBTRACT    
              7 STORE_FAST               0 (a)
             10 LOAD_CONST               0 (None)
             13 RETURN_VALUE        
None

解釋:

可以發現要執行這兩個函式實際上在底層運行了四個步驟

1.load a 
2.load 1
3.+/-操作
4.賦值給a 

那麼在這個過程中GIL隨時都能進行切換,就會造成最後賦值的混亂,這種場景在web 開發,特別是電商網站的開發中出現的概率非常大,因為很多人會同時減庫存

2.怎樣實現執行緒同步

(1)加鎖

python 為我們提供了一些鎖的機制,最常見的就是 from threading import Lock

示例程式碼:

# coding=utf-8
import threading
from threading import Lock

total = 0
lock = Lock()  # 宣告一把鎖

def add():
    global total
    global lock

    for i in xrange(100000):
        lock.acquire()
        total += 1
        lock.release() #一定要去釋放這個鎖,要不然程式就無法繼續運行了


def desc():
    global total
    global lock
    for i in xrange(100000):
        lock.acquire()
        total -= 1
        lock.release()


thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)

thread1.start()
thread2.start()
thread1.join()
thread2.join()

print total

結果:

0       

解釋:

這樣無論怎麼執行結果都是0了,有了鎖以後我們就能實現程式碼段的交替執行,但是鎖會影響效能

(2)鎖會引起死鎖

如果我們在 acquire() 裡面再次 acquire() 就會引起互相的等待造成死鎖,因此我們使用鎖的時候要格外的小心

(3)可重入的鎖 RLock

有了這個鎖我們就能在一個執行緒裡面呼叫多次 acquire() (注意:呼叫 acquire()的此時一定要和 release()保持一致)

示例程式碼:

# coding=utf-8
import threading
from threading import RLock

total = 0

lock = RLock()  # 宣告一把鎖

def add():
    global total
    global lock

    for i in xrange(100000):
        lock.acquire()
        lock.acquire()
        total += 1
        lock.release() #一定要去釋放這個鎖,要不然程式就無法繼續運行了
        lock.release()

def desc():
    global total
    global lock
    for i in xrange(100000):
        lock.acquire()
        total -= 1
        lock.release()


thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)

thread1.start()
thread2.start()
thread1.join()
thread2.join()

print total

五、多執行緒中的condition

condition—> 條件變數,用於複雜的執行緒間同步

1.Lock 能否實現協同讀詩?

一般來講這個東西的使用是在完成類似對話的操作的時候,比如天貓精靈和小愛的對話,但是這個你一句我一句的形式感覺 lock 也能完成,我們先實驗一下.

示例程式碼:

# coding=utf-8
import threading
from threading import Lock

class XiaoAi(threading.Thread):
    def __init__(self,lock):
        super(XiaoAi,self).__init__(name = "小愛")
        self.lock = lock
    def run(self):
        self.lock.acquire()
        print "{name}:哎".format(name = "小愛")
        self.lock.release()

        self.lock.acquire()
        print "{name}:好啊".format(name="小愛")
        self.lock.release()


class TianMao(threading.Thread):
    def __init__(self,lock):
        super(TianMao,self).__init__(name = "天貓精靈")
        self.lock = lock
    def run(self):
        self.lock.acquire()
        print "{name}:小愛同學".format(name = "天貓精靈")
        self.lock.release()

        self.lock.acquire()
        print "{name}:我們來對古詩吧".format(name = "天貓精靈")
        self.lock.release()

if __name__ == '__main__':
    lock = Lock()
    xiaoai = XiaoAi(lock)
    tianmao = TianMao(lock)
    tianmao.start()
    xiaoai.start()

結果:

天貓精靈:小愛同學
天貓精靈:我們來對古詩吧
小愛:哎
小愛:好啊

我們發現,天貓精靈把話都說了然後才給了小愛,這並不是我們想要的結果

2.condition 實現協同操作

condition 的鎖的機制實際上還是使用的是 RLock 中的 acquire() 和 release() ,而condition中的 wait 和 notify 才是他的精髓所在

示例程式碼:

# coding=utf-8
import threading
from threading import Condition

class XiaoAi(threading.Thread):
    def __init__(self,cond):
        super(XiaoAi,self).__init__(name = "小愛")
        self.cond = cond
    def run(self):
        with self.cond:
            self.cond.wait()
            print "{name}:哎".format(name = "小愛")
            self.cond.notify()

            self.cond.wait()
            print "{name}:好啊".format(name="小愛")
            self.cond.notify()


class TianMao(threading.Thread):
    def __init__(self,cond):
        super(TianMao,self).__init__(name = "天貓精靈")
        self.cond = cond
    def run(self):
        with self.cond:
            print "{name}:小愛同學".format(name = "天貓精靈")
            self.cond.notify()
            self.cond.wait()

            print "{name}:我們來對古詩吧".format(name = "天貓精靈")
            self.cond.notify()
            self.cond.wait()

if __name__ == '__main__':
    cond = Condition()
    xiaoai = XiaoAi(cond)
    tianmao = TianMao(cond)
    xiaoai.start()            # 注意這裡的順序和之前的程式碼不同
    tianmao.start()

結果:

天貓精靈:小愛同學
小愛:哎
天貓精靈:我們來對古詩吧
小愛:好啊

解釋:

1.使用 condition 的時候執行的先後順序是一定不能弄錯的,否則就會出現執行不下去的情況,比如我們先啟動天貓精靈,那麼我們的 Notify 就已經在小愛沒有啟動的時候就發出去了,小愛就永遠收不到天貓的請求,於是對話就陷入的僵局

2.使用了 with 語句,就相當於呼叫了兩個魔法方法,就不用單獨寫 self.cond.acquire() 和 self.cond.release() 了。(必須在 呼叫了 with 以後才能呼叫 self.cond.wait() 和 self.cond.notify())

六、可控數量的執行緒同步機制:Semaphore

Semaphore(訊號量) 是另一個執行緒同步機制,是一種用於控制進入數量的鎖

什麼時候會用到這個機制呢?

1.比如我們做爬蟲,我們想控制執行緒的最大數量,這樣就能防止被反爬 2.比如我們想要控制對一個檔案的讀取執行緒的數量不能超過某一個值

沒有使用前的例子

程式碼示例:

import threading
import time

class HtmlSpider(threading.Thread):
    def __init__(self,url):
        super(HtmlSpider,self).__init__()
        self.url = url

    def run(self):
        time.sleep(2)
        print "got html successfully\n"


class UrlProducer(threading.Thread):
    def run(self):
        for i in xrange(20):
            html_thread = HtmlSpider("http://www.html_spider.com/{id}".format(id = i))
            html_thread.start()


if __name__ == '__main__':
    url_producer = UrlProducer()
    url_producer.start()

結果:

got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully
got html successfully

這裡現在有20個執行緒,但是如果我們有這樣的需求,我們希望每併發的數量不超過3個,那麼我們就要使用到Semaphore

程式碼示例:

# coding=utf-8
import threading
import time

class HtmlSpider(threading.Thread):
    def __init__(self,url,sem):
        super(HtmlSpider,self).__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print "got html successfully\n"
        self.sem.release() # 在這裡釋放---------------->注意一定要在執行完全結束以後釋放

class UrlProducer(threading.Thread):
    def __init__(self,sem):
        super(UrlProducer,self).__init__()
        self.sem = sem

    def run(self):
        for i in xrange(20):
            self.sem.acquire() # 在這裡上鎖
            html_thread = HtmlSpider("http://www.html_spider.com/{id}".format(id = i),self.sem)
            html_thread.start()
            ## self.sem.release()  # 在這裡釋放 ----------->這裡釋放還是20個併發,因為沒在執行完畢的時候釋放,start()執行後還要等函式執行完畢

if __name__ == '__main__':
    sem = threading.Semaphore(3)

    url_producer = UrlProducer(sem)
    url_producer.start()

解釋:

Semaphore 底層是呼叫condition 實現的,sem.acquire() 會記錄數量,滿三個就上鎖,解鎖一個數字就減一

七、執行緒池concurrent.futures(Python 3)

1.為什麼要用執行緒池?

執行緒池能夠自動的幫助我們進行執行緒的排程,而不用我們手動控制,我們有新的執行緒就丟到執行緒池中就可以了,減輕了我們的負擔,非常方便

比如: 1.我們想在主執行緒中獲取某一個執行緒或者任務的狀態或者返回值 2.當一個執行緒完成的時候主執行緒能立刻的知道 3.futures 可以讓多執行緒和多程序的編碼介面一致

小試牛刀

示例程式碼:

from concurrent.futures import ThreadPoolExecutor
import time

def get_html(times):
    time.sleep(times)
    print("get html {id} successfully".format(id = times))
    return times

if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=2)# 建立一個執行緒池的執行器
    task1 = executor.submit(get_html,(3)) # 將程式丟入執行緒池
    task2 = executor.submit(get_html,(2))
    print(task2.done())
    time.sleep(3)
    print(task2.done())
    print(task2.result()) # 獲取返回結果

結果:

False
get html 2 successfully
get html 3 successfully
True
2

注意:

執行緒池物件還有個 cancel() 方法能夠結束未開始執行的執行緒(注意是未開始執行),因為我們現在允許一下子執行兩個執行緒,所以是沒法cancel() 掉的,我們如果想cancel() 的話我們可以將這個最多同時執行兩個改成同時執行一個嗎,這樣就能 cancel()了

2.我們想獲取到執行成功的task 的返回

方法一:使用 as_completed

示例程式碼:

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import time

def get_html(times):
    time.sleep(times)
    print("get html {id} successfully".format(id = times))
    return times

if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=2)# 建立一個執行緒池的執行器

    urls = [2,3,4,5,6,7]
    tasks = [executor.submit(get_html,(url)) for url in urls]
    for future in as_completed(tasks):
        data = future.result()
        print("get {data} successfully\n".format(data = data))

方法二:使用 executor 自己的 map 方法

示例程式碼:

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import time

def get_html(times):
    time.sleep(times)
    print("get html {id} successfully".format(id = times))
    return times

if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=2)# 建立一個執行緒池的執行器
    urls = [2,3,4,5,6,7]
    for data in executor.map(get_html,urls):
        print("get {data} successfully\n".format(data = data))

3.wait()方法申請等待

wait() 什麼執行緒就會等待這個執行緒執行完再執行其他的執行緒

示例程式碼:

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures import wait
from concurrent.futures import FIRST_COMPLETED
import time

def get_html(times):
    time.sleep(times)
    print("get html {id} successfully".format(id = times))
    return times

if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=2)# 建立一個執行緒池的執行器
    urls = [2,3,4,5,6,7]
    tasks = [executor.submit(get_html,(url)) for url in urls]
    wait(tasks,return_when=FIRST_COMPLETED)
    print('main')

結果:

get html 2 successfully
main
get html 3 successfully
get html 4 successfully
get html 5 successfully
get html 6 successfully
get html 7 successfully