1. 程式人生 > >Python實戰之執行緒(函式多執行緒,類多執行緒,守護執行緒,GIL,執行緒lock,遞迴Rlock,Semaphore訊號量,event)

Python實戰之執行緒(函式多執行緒,類多執行緒,守護執行緒,GIL,執行緒lock,遞迴Rlock,Semaphore訊號量,event)

首先須知

什麼是IO?

從硬碟讀塊資料,從網路讀塊資料屬於IO操作(IO操作不佔用cpu)

計算佔用cpu,如:1+1

Python多執行緒其實就是一個執行緒,由於利用CUP的上下文切換看起來就像是併發..上下文切換消耗資源

Python多執行緒 不適合CPU密集操作型的任務,適合IO操作密集型的任務

大量運算佔CPU儘量少用多執行緒,用單程序更快

sockeserver接收多個網路併發的就是IO操作密集型的

如果一定要使用CPU密集操作型的任務呢?Python怎麼解決?

使用多程序來解決CPU密集操作型的任務。

Python的執行緒是呼叫作業系統的原生執行緒,程序也是呼叫作業系統的原生程序,原生程序是由作業系統自己維護的。Python只是呼叫了C程式碼庫的一個介面啟動程序的,真正的程序管理還是作業系統自己完成的

執行緒摘要

1.執行緒:是作業系統最小的排程單位,是一串指令的集合,被包含在程序裡面

2. 多執行緒共享建立他的進行的記憶體地址空間

3.同時間一核CPU只能執行一個程式,為什麼可以同時上QQ和打遊戲,因為利用上下文機率進行切換的,CPU速度太快所以我們肉眼感覺是同一時間執行的

4.GIL全域性直譯器鎖稱之吉爾,是一個互斥鎖,禁止多個本地執行緒執行Python位元組碼(是Python的缺陷)

5.主執行緒可以建立子執行緒  子執行緒可以繼續建立執行緒  但是主執行緒與子執行緒都是獨立的個體

6.程序和執行緒哪個快?沒有可比性...因為程序執行是需要執行緒執行的

硬要說 一樣快

7 啟動程序和執行緒哪個快?執行緒就一堆指令所以執行緒當然快

 

執行緒的兩種方式(類,函式)

最簡單的執行緒試驗:

[[email protected] Part_nine]# vim shreading_01.py
__author__ = "Burgess Zheng"

import threading
import time

def run(n):#該函式當成一個任務
    print("task",n)
    time.sleep(2)#增加間隔時間測試程序啟動和執行緒啟動的效率

#啟動兩個執行緒執行上面的任務
t1 = threading.Thread(target=run,args=("t1",)) #例項化執行緒
t2 = threading.Thread(target=run,args=("t2",))#例項化執行緒
#啟動兩個執行緒 target:執行目標:run該函式 ,args實參:("tx",) 記住哪怕一個實參也需要逗號,

#執行緒執行
t1.start()
t2.start()

#程序執行
run("t1")
run("t2")

#最後結果,看不出都是一瞬間就顯示出來,歸功於CPU運算快過肉眼
#但是如果加上sleep間隔時間就可以看出執行緒和程序之前的區別

測試結果:

類方式執行緒試驗:

__author__ = "Burgess Zheng"

import threading
import time

#類的方式 多執行緒啟動試驗#一般不這麼寫 一般都是已函式的方式寫

class MyThread(threading.Thread):
    def __init__(self,n,sleep_time):
        super(MyThread,self).__init__()
        #由於把父類的建構函式重寫了就需要這條語句繼承父類的建構函式
        self.n = n
        self.sleep_time = sleep_time
    def run(self):#類裡面被多執行緒執行的函式必須叫做run
        print("runnint task",self.n)
        time.sleep(self.sleep_time)
        print("task done:",self.n)

'''
t1 = MyThread("t1")
t2 = MyThread("t2")
t1.start()
t2.start() #兩個執行緒同執行
'''

'''
t1 = MyThread("t1")
t2 = MyThread("t2")
t1.start()
t1.join()
    #python語言:join = wait  等的意思
    #把並行變成了序列,等t1.start()該執行緒執行完畢以後才執行t2執行緒和主執行緒
t2.start()
'''

'''
t1 = MyThread("t1")
t2 = MyThread("t2")
#我要讓主執行緒等所有的子執行緒執行完畢在執行主執行緒
t1.start()
t2.start()
t1.join()
#等上面的執行緒全部執行完畢了以後,主執行緒才往下執行
#但是記住只是等t1執行完畢了以後,主執行緒就繼續往下執行
#如果此時t2執行緒的執行時間>t1執行緒的執行時間,主執行緒不會等t2執行完畢在繼續執行
print("sfasdf")
'''

#往類建構函式增加多一個實參(時間)測試執行緒時間不同主程序等等的時間
t1 = MyThread("t1",2)
t2 = MyThread("t2",4)
t1.start()
t2.start()
t1.join()
print("sfasdf")
#結果主執行緒只等到t1執行緒結束以後繼續執行,不會等待t2執行緒後才執行
#所以測試執行用時的時候,只顯示執行2秒,那是t1的執行用時間
#所以你如果要整個子執行緒執行用時計算出來
# 就應該t2.join()這樣主執行緒才會等待2個子執行緒執行完才執行,計算的用時就是4秒

測試結果:

函式for迴圈啟動多執行緒試驗(主執行緒和子執行緒的執行原理)  

_author__ = "Burgess Zheng"

import threading
import time

def run(n):#該函式當成一個任務
    print("task",n)
    time.sleep(2)#增加間隔時間測試程序啟動和執行緒啟動的效率
    print("task done")


'''
#for迴圈啟動多個執行緒,總不能我要啟動一個執行緒就複製一個
start_time = time.time()
for i in range(10):
    t = threading.Thread(target=run,args=("t%s" %i,)) #例項化執行緒
    t.start()
print("---------------all threads has finished")
print("cost:",time.time() - start_time)
#執行緒執行的時候結果每個執行緒同時先執行了函式run 第一條命令print("task",n)
#但是主執行緒不會等子執行緒執行完畢在執行,而是直接繼續往下執行
      #print("---------------all threads has finished")
      #print("cost:",time.time() - start_time)
#主執行緒啟動了子執行緒後,子執行緒和主執行緒是獨立的,主執行緒不會等子執行緒執行完在執行
#主執行緒和子執行緒執行是並行了,同時執行的,分別處理自己的事情,互補干擾
'''

#預設主執行緒是不會等子執行緒執行完在執行的,
# 但是如果有特定的場景需要計算全部子執行緒執行完畢的時間
# 可以利用引數做到
start_time = time.time()
t_objs = []#生產個空列表
for i in range(10):
    t = threading.Thread(target=run,args=("t%s" %i,)) #例項化執行緒
    t.start()
    #t.join() #如果在這裡使用join就會導致啟動1個執行緒就得到結果在啟動另外一個執行緒就是串行了
              #而我們現在的要求是一次性50個執行緒進行join,需要在外面寫個迴圈
    t_objs.append(t) #把t迴圈結果值追加到t_objs該列表

for t in t_objs:
    t.join()

print("---------------all threads has finished")
print("cost:",time.time() - start_time)
#這樣主執行緒就會等所有子執行緒執行完畢在繼續執行

測試結果:

    

結果:主執行緒只負責呼叫子執行緒,但是不會等待子執行緒執行後才執行主執行緒和子執行緒是同時並行執行的,如果注執行緒需要等子執行緒執行完才執行需要用到命令x.join()

守護執行緒試驗(守護執行緒的原理)

__author__ = "Burgess Zheng"

import threading
import time

def run(n):
    print("task ",n )
    time.sleep(2)
    print("task done",n,threading.current_thread())
                            #threading.current_thread():檢視是主執行緒執行還是子執行緒執行

start_time = time.time()
t_objs = [] #存執行緒例項
for i in range(5):
    t = threading.Thread(target=run,args=("t-%s" %i ,)) #例項化執行緒
    t.setDaemon(True)
#.setDaemon(True) 把當前執行緒設定成守護執行緒
# 這樣主執行緒執行完畢,不會在乎子程序是否執行完畢,直接退出整個程式
    t.start()
    t_objs.append(t) #為了不阻塞後面執行緒的啟動,不在這裡join,先放到一個列表裡

print("---all threads has finished...",threading.current_thread(),threading.active_count())
                                           #threading.current_thread() :檢視執行的執行緒是主執行緒還是子
程序
                                           #threading.active_count()當前活動執行緒的個數
print("cost:",time.time() - start_time)

測試結果:

守護執行緒:主執行緒執行完畢直接程式關閉,不會在乎子執行緒是否已經執行完畢了。 我們清楚主執行緒只是呼叫子執行緒取執行但是和子執行緒是並行執行的,不存在所謂主執行緒等待子執行緒

應用場景:一個php程式,每個使用者連線進來php程式會啟動一個執行緒,php本身就是程式,啟動的時候就是一個守護程序,程序是無法執行的而是依靠主執行緒去執行的,我總不能把PHP服務關閉掉 執行緒還繼續執行吧?

setDaemon(True) 把當前執行緒設定成守護執行緒

threading.current_thread():檢視執行的執行緒是主執行緒還是子程序

threading.active_count():當前活動執行緒的個數

threading.get_ident():顯示執行緒號

 

Python GIL

Python GIL(Global Interpreter Lock)  

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

全域性直譯器鎖在CPython,或吉爾,是一個互斥鎖,防止多個本地執行緒執行Python位元組碼。這把鎖是必要的,主要是因為CPython的記憶體管理不是執行緒安全的。(然而,由於吉爾存在,其他功能已經習慣於依賴保證執行)

上面的核心意思就是,無論你啟多少個執行緒,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個執行緒執行,擦。。。,那這還叫什麼多執行緒呀?莫如此早的下結結論,聽我現場講。

首先需要明確的一點是GIL並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來編譯成可執行程式碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段程式碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下預設的Python執行環境。所以在很多人的概念裡CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。所以這裡要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL

Cpython GIL的缺陷

無論多少個CPU 多少個執行緒,只調用一個執行緒執行,其餘的就候著不做事

python是C語言寫的 

Python的直譯器是呼叫C語言的介面

GIL只是Cpython的缺陷,並不是整個python的缺陷 未來的趨勢PyPy

Cpython去掉GIL是不可能的。但是有折中的辦法後面會講

 

執行緒鎖(互斥鎖Mutex)

全域性GIL鎖的原理,雖然也是保證1個執行緒修改該資料,為什麼還需要執行緒鎖?但是因為多執行緒對該資料進行了copy修改,最後才覆蓋得到結果,很多時候還是導致出問題

一個程序下可以啟動多個執行緒,多個執行緒共享父程序的記憶體空間,也就意味著每個執行緒可以訪問同一份資料,此時,如果2個執行緒同時要修改同一份資料,會出現什麼狀況?

正常來講,這個num結果應該是0 但在python 2.7上多執行幾次,會發現,最後打印出來的num結果不總是0,為什麼每次執行的結果不一樣呢? 哈,很簡單,假設你有A,B兩個執行緒,此時都 要對num 進行減1操作, 由於2個執行緒是併發同時執行的,所以2個執行緒很有可能同時拿走了num=100這個初始變數交給cpu去運算,當A執行緒去處完的結果是99,但此時B執行緒運算完的結果也是99,兩個執行緒同時CPU運算的結果再賦值給num變數後,結果就都是99。那怎麼辦呢? 很簡單,每個執行緒在要修改公共資料時,為了避免自己在還沒改完的時候別人也來修改此資料,可以給這個資料加一把鎖, 這樣其它執行緒想修改此資料時就必須等待你修改完畢並把鎖釋放掉後才能再訪問此資料。 

*注:不要在3.x上執行,不知為什麼,3.x上的結果總是正確的,可能是自動加了鎖

 

執行緒鎖實驗:

#-*- coding:utf-8 -*-
__author__ = "Burgess Zheng"

import threading
import time

def run(n):
    lock.acquire()#獲取一把鎖
    global num#修改全域性變數
    num +=1
    #time.sleep(1)
#記住加鎖以後只有一個執行緒修改才釋放
# 如果sleep的話,有所少個執行緒操作就要等多少秒,有鎖的時候最好別使用sleep
    lock.release()#釋放鎖

lock = threading.Lock()#全域性:生成鎖的例項
num = 0
t_objs = [] #存執行緒例項
for i in range(5):
    t = threading.Thread(target=run,args=("t-%s" %i ,)) #例項化執行緒
    t.start()
    t_objs.append(t) #所有執行緒假如列表
#所有執行緒都對run該函式進行了修改,但是Python3得到num結果正常,可能自動加鎖了,
# Python2就出問題需要手動加鎖#針對Python2的問題進行加鎖

for t in t_objs: #迴圈執行緒例項列表,等待所有執行緒執行完畢
    t.join()

print("----------all threads has finished...",threading.current_thread(),threading.active_count())

print("num:",num)

GIL VS Lock 

就是既然你之前說過了,Python已經有一個GIL來保證同一時間只能有一個執行緒來執行了,為什麼這裡還需要lock? 注意啦,這裡的lock是使用者級的lock,跟那個GIL沒關係 ,具體我們通過下圖來看一下+配合我現場講給大家,就明白了。

那你又問了, 既然使用者程式已經自己有鎖了,那為什麼C python還需要GIL呢?加入GIL主要的原因是為了降低程式的開發的複雜度,比如現在的你寫python不需要關心記憶體回收的問題,因為Python直譯器幫你自動定期進行記憶體回收,你可以理解為python直譯器裡有一個獨立的執行緒,每過一段時間它起wake up做一次全域性輪詢看看哪些記憶體資料是可以被清空的,此時你自己的程式 裡的執行緒和 py直譯器自己的執行緒是併發執行的,假設你的執行緒刪除了一個變數,py直譯器的垃圾回收執行緒在清空這個變數的過程中的clearing時刻,可能一個其它執行緒正好又重新給這個還沒來及得清空的記憶體空間賦值了,結果就有可能新賦值的資料被刪除了,為了解決類似的問題,python直譯器簡單粗暴的加了鎖,即當一個執行緒執行時,其它人都不能動,這樣就解決了上述的問題,  這可以說是Python早期版本的遺留問題。

RLock(遞迴鎖)

說白了就是在一個大鎖中還要再包含子鎖就需要Rlock 而是不lock 如果lock就會導致解鎖出現問題,變死迴圈

簡單一點就是 如果多執行緒呼叫一個任務而該函式是有lock鎖的 但是該函式裡面又執行呼叫另外的函式,另外的函式也有lock鎖,就會導致解鎖的時候出現死迴圈

解決:全域性生成鎖的時候使用RLock遞迴鎖

 

遞迴鎖實驗:

#-*- coding:utf-8 -*-
__author__ = "Burgess Zheng"

import threading, time


def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num


def run2():
    print("grab the second part data")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2


def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)


num, num2 = 0, 0
lock = threading.RLock()#RLock當出現多重鎖的時候就需要用Rlock#否則出錯死迴圈
for i in range(3):
    t = threading.Thread(target=run3) #迴圈執行run3,3次
                                  #例項化執行緒
    t.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

測試結果:

Semaphore(訊號量)

互斥鎖 同時只允許一個執行緒更改資料,而Semaphore是同時允許一定數量的執行緒更改資料 ,比如廁所有3個坑,那最多隻允許3個人上廁所,後面的人只能等裡面有人出來了才能再進去。

 

Semaphore實驗:

__author__ = "Burgess Zheng"

import threading, time


def run(n):
    semaphore.acquire()#訊號量獲取多把鎖
    time.sleep(1)
    print("run the thread: %s\n" % n)
    semaphore.release()#訊號量釋放多把鎖

if __name__ == '__main__':
    semaphore = threading.BoundedSemaphore(5)  # 最多允許5個執行緒同時執行
    for i in range(10):
        t = threading.Thread(target=run, args=(i,)) #例項化執行緒
        t.start()
while threading.active_count() != 1:#根join效果是一樣的,執行緒不等於1的時候不往下走
    pass  # print threading.active_count()
else:
    print('----all threads done---')
    #print(num)

測試結果:

Events

An event is a simple synchronization object;

一個事件是一個簡單的同步物件;

the event represents an internal flag, and threads

can wait for the flag to be set, or set or clear the flag themselves.

事件代表一個內部標記,和執行緒
可以等待標誌被設定,或者設定或清除標誌本身。

event = threading.Event()

event.wait()# a client thread can wait for the flag to be set客戶端執行緒等待標記設定

event.set()# a server thread can set or reset it一個伺服器執行緒可以設定或重置它

event.clear()

If the flag is set, the wait method doesn’t do anything.

如果設定了標記,方法不做任何事。

If the flag is cleared, wait will block until it becomes set again.

如果標誌被清除,等待會阻塞,直到它再次被設定。

Any number of threads may wait for the same event.

任意數量的執行緒可能等待相同的事件。

通過Event來實現兩個或多個執行緒間的互動,下面是一個紅綠燈的例子,即起動一個執行緒做交通指揮燈,生成幾個執行緒做車輛,車輛行駛按紅燈停,綠燈行的規則

Event的作用就是標誌位,如果有標誌位代表通行,如果沒有標誌位代表阻塞

Event試驗:

__author__ = "Burgess Zheng"

import time
import threading


event = threading.Event()

def lighter():
    count = 0
    event.set()#設定標誌位
    while True:
        if count > 5 and count < 10:#大於5 小於10 清空標誌位,變阻塞
            event.clear()#把標誌位清空了,變阻塞
            print("\033[41;1mred light is on...\033[0m")
        elif count >10:#大於10就改成綠燈
            event.set()#設定標誌位,變通行
            count = 0#清空標誌位
        else:
            print("\033[42;1mgreen light is on...\033[0m")
        time.sleep(1)
        count +=1

def car(name):
    while True:
        if event.is_set():#判斷標誌位的設定狀態,設定代表通行,迴圈執行下面命令
            print("[%s] running..." % name)
            time.sleep(1)#設定才看的出效果
        else:#如果沒有標誌位就迴圈執行下面的命令
            print("[%s] sees red light,waiting..")
            event.wait()#等待標誌位,如果沒有標誌位,就等到有標誌位才繼續迴圈執行
            print("\033[34;1m[%s] green light is on ,start going..\033[0m" %name)

light = threading.Thread(target=lighter,)#例項化執行緒
light.start()
car1 = threading.Thread(target=car,args=("Tesla",))#例項化執行緒
car1.start()

測試結果: