1. 程式人生 > >執行緒和協程

執行緒和協程

執行緒

初識執行緒:

  • 輕量級程序,直接被cpu排程
  • 不能獨立存在的輕量級程序
  • 同一個程序中的多個執行緒之間的資料共享

執行緒和程序的關係:

執行緒和程序的區別可以歸納為以下4點:

  • 地址空間和其他資源(如開啟檔案):程序間相互獨立,同一個程序的各執行緒間共享.某程序內的執行緒在其他程序不可見
  • 通訊:程序間通訊IPC,執行緒間可以直接讀寫程序資料段(如全域性變數)來進行通訊--需要程序同步和互斥手段的輔助,以保證資料的一致性
  • 排程和切換:執行緒上下文切換比程序上下文切換要快的多
  • 在多執行緒作業系統中,程序不是一個可執行的實體

全域性直譯器鎖GIL:

  • 全域性直譯器鎖,是用來鎖執行緒的,Cpython直譯器提供的,導致了同一時刻只能有一個執行緒訪問cpu

python執行緒模組的選擇:

  Python提供了幾個用於多執行緒程式設計的模組,包括thread、threading和Queue等。thread和threading模組允許程式設計師建立和管理執行緒。thread模組提供了基本的執行緒和鎖的支援,threading提供了更高級別、功能更強的執行緒管理的功能。Queue模組允許使用者建立一個可以用於多個執行緒之間共享資料的佇列資料結構。
  避免使用thread模組,因為更高級別的threading模組更為先進,對執行緒的支援更為完善,而且使用thread模組裡的屬性有可能會與threading出現衝突;其次低級別的thread模組的同步原語很少(實際上只有一個),而threading模組則有很多;再者,thread模組中當主執行緒結束時,所有的執行緒都會被強制結束掉,沒有警告也不會有正常的清除工作,至少threading模組能確保重要的子執行緒退出後進程才退出。 

  thread模組不支援守護執行緒,當主執行緒退出時,所有的子執行緒不論它們是否還在工作,都會被強行退出。而threading模組支援守護執行緒,守護執行緒一般是一個等待客戶請求的伺服器,如果沒有客戶提出請求它就在那等著,如果設定一個執行緒為守護執行緒,就表示這個執行緒是不重要的,在程序退出的時候,不用等待這個執行緒退出

threading模組

執行緒的建立:

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello
' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主執行緒') 建立執行緒的方式1
建立執行緒的方式1
from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('egon')
    t.start()
    print('主執行緒')

建立執行緒的方式2
建立執行緒的方式2

Thread類的其他方法:

Thread例項物件的方法
  # isAlive(): 返回執行緒是否活動的。
  # getName(): 返回執行緒名。
  # setName(): 設定執行緒名。

threading模組提供的一些方法:
  # threading.currentThread(): 返回當前的執行緒變數。
  # threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
  # threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。
例項物件
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主執行緒')
    print(t.is_alive())
    '''
    egon say hello
    主執行緒
    False
    '''

join方法
join方法

守護執行緒:

無論在程序還是執行緒,都遵循:守護xx會等待主xx執行完畢後倍銷燬.需要強調的是:執行完畢並非終止執行

#1.對主程序來說,執行完畢指的是主程序程式碼執行完畢
#2.對主執行緒來說,執行完畢指的是主執行緒所在的程序內所有非守護執行緒統統執行完畢,主執行緒才算執行完畢
#1 主程序在其程式碼結束後就已經算執行完畢了(守護程序在此時就被回收),然後主程序會一直等非守護的子程序都執行完畢後回收子程序的資源(否則會產生殭屍程序),才會結束,
#2 主執行緒在其他非守護執行緒執行完畢後才算執行完畢(守護執行緒在此時就被回收)。因為主執行緒的結束意味著程序的結束,程序整體的資源都將被回收,而程序必須保證非守護執行緒都執行完畢後才能結束。
詳細解釋
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必須在t.start()之前設定
    t.start()

    print('主執行緒')
    print(t.is_alive())
    '''
    主執行緒
    True
    '''
守護執行緒例

鎖:

互斥鎖:

無論在相同的執行緒還是不同的執行緒,都只能連續acquire一次
要想在acquire,必須先release
互斥鎖

死鎖:

所謂死鎖:就是兩個或者兩個以上的程序或執行緒在執行的過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去.此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序成為死鎖程序,如下就是死鎖

from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()
死鎖

遞迴鎖:

在同一個程序中.可以無限次的acquire,但是要現在其他程序中也acquire,必須在自己的執行緒中新增和acquire次數相同的release

from threading import RLock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()
遞迴鎖Rlook

訊號量:

  • Semaphore管理一個內建的計數器,每當呼叫acquire()時內建的計數器-1,呼叫release()是內建計數器+1,計數器不能小於0,當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()
from threading import Thread,Semaphore
import threading
import time
# def func():
#     if sm.acquire():
#         print (threading.currentThread().getName() + ' get semaphore')
#         time.sleep(2)
#         sm.release()
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()
例項

事件:

執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其 他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event物件。 物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。在 初始情況下,Event物件中的訊號標誌被設定為假。如果有執行緒等待一個Event物件, 而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event物件的訊號標誌設定為真,它將喚醒所有等待這個Event物件的執行緒。如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞執行緒;
event.set(): 設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程;
event.clear():恢復event的狀態值為False。

條件:

使得執行緒等待,只有滿足某條件是,才釋放n個程序

Python提供的Condition物件提供了對複雜執行緒同步問題的支援。Condition被稱為條件變數,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。執行緒首先acquire一個條件變數,然後判斷一些條件。如果條件不滿足則wait;如果條件滿足,進行一些處理改變條件後,通過notify方法通知其他執行緒,其他處於wait狀態的執行緒接到通知後會重新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。
詳細說明

定時器:

from threading import Timer
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed
定時器

執行緒佇列:

queue佇列 :使用import queue,用法與程序Queue一樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先進先出
import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''

先進先出
先進先出

class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''

後進先出
後進後出

class queue.PriorityQueue(maxsize=0) #儲存資料時可設定優先順序的佇列

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先順序越高,優先順序高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

優先順序佇列
優先順序佇列

concurrent.futures模組:

#1 介紹
concurrent.futures模組提供了高度封裝的非同步呼叫介面
ThreadPoolExecutor:執行緒池,提供非同步呼叫
ProcessPoolExecutor: 程序池,提供非同步呼叫
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
非同步提交任務

#map(func, *iterables, timeout=None, chunksize=1) 
取代for迴圈submit的操作

#shutdown(wait=True) 
相當於程序池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源後才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait引數為何值,整個程式都會等到所有任務執行完畢
submit和map必須在shutdown之前

#result(timeout=None)
取得結果

#add_done_callback(fn)
回撥函式
用法

協程

協程介紹:

  • 協程是單執行緒下的併發,又稱微執行緒,纖程
  • 協程是一種使用者態的輕量級執行緒,即協程是由使用者程式自己控制排程的
  • 協程能夠實現在一條執行緒上的多個任務互相切換
  • 為了提高工作效率,使用者可以控制在一個任務中遇到io就切換

對比作業系統控制執行緒的切換,使用者在單執行緒內控制協程的切換

優點如下:

1.協程的切換開銷更小,屬於使用者級別的切換,作業系統完全感知不到,因而更加        輕量級
2.單執行緒內就可以實現併發的效果,最大限度利用cpu

缺點如下:

1.協程的本質是單執行緒下,無法利用多核,可以是一個程式開啟多個程序,每個程序內開啟多個執行緒,每個執行緒內開啟協程
2.協程指的是單個執行緒,因而一旦出現協程阻塞,將會阻塞整個執行緒

greenlet模組:

from greenlet import greenlet

def eat(name):
    print('%s eat 1' %name)
    g2.switch('egon')
    print('%s eat 2' %name)
    g2.switch()
def play(name):
    print('%s play 1' %name)
    g1.switch()
    print('%s play 2' %name)

g1=greenlet(eat)
g2=greenlet(play)

g1.switch('egon')#可以在第一次switch時傳入引數,以後都不需要

greenlet實現狀態切換
greenlet實現狀態切換

greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題

gevent模組:

Gevent 是一個第三方庫,可以輕鬆通過gevent實現併發同步或非同步程式設計,在gevent中用到的主要模式是Greenlet, 它是以C擴充套件模組形式接入Python的輕量級協程。 Greenlet全部執行在主程式作業系統程序的內部,但它們被協作式地排程

g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程物件g1,spawn括號內第一個引數是函式名,如eat,後面可以有多個引數,可以是位置實參或關鍵字實參,都是傳給函式eat的

g2=gevent.spawn(func2)

g1.join() #等待g1結束

g2.join() #等待g2結束

#或者上述兩步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

用法介紹
用法介紹

from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模組之前

或者我們乾脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到檔案的開頭