1. 程式人生 > >鎖,執行緒安全,執行緒池,threading.local,生產者,消費者模型

鎖,執行緒安全,執行緒池,threading.local,生產者,消費者模型

一丶鎖

1.鎖:LOCK(一次放一個)

執行緒安全,多執行緒操作的時候,內部會讓所有執行緒排隊處理,如list/dict/Queue

執行緒不安全 + 人 => 排隊處理

需求:

  a.建立100個執行緒,在列表中追加8

  b.建立100個執行緒

    v = []     鎖     把自己的新增到列表中。   在讀取列表的最後一個。     解鎖

import threading
import time

v = []
lock = threading.Lock()

def func(arg):
    lock.acquire()
    v.append(arg)
    time.sleep(
0.01) m = v[-1] print(arg,m) lock.release() for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
LOCK

2.鎖   Rlock(一次放一個)

import threading
import time

v = []
lock = threading.RLock()
def func(arg):
    lock.acquire()
    lock.acquire()

    v.append(arg)
    time.sleep(
0.01) m = v[-1] print(arg,m) lock.release() lock.release() for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
RLOCK

3.鎖 BoundedSemaphone(一次放n個) 訊號量

import time
import threading

lock = threading.BoundedSemaphore(3)
def func(arg):
    
lock.acquire() print(arg) time.sleep(1) lock.release() for i in range(20): t =threading.Thread(target=func,args=(i,)) t.start()
BoundedSemaphore

4.鎖  Condition(1次方法x個)

import time
import threading

lock = threading.Condition()

# ############## 方式一 ##############

def func(arg):
    print('執行緒進來了')
    lock.acquire()
    lock.wait() # 加鎖

    print(arg)
    time.sleep(1)

    lock.release()


for i in range(10):
    t =threading.Thread(target=func,args=(i,))
    t.start()

while True:
    inp = int(input('>>>'))

    lock.acquire()
    lock.notify(inp)
    lock.release()
方法一
def xxxx():
    print('來執行函數了')
    input(">>>")
    # ct = threading.current_thread() # 獲取當前執行緒
    # ct.getName()
    return True

def func(arg):
    print('執行緒進來了')
    lock.wait_for(xxxx)
    print(arg)
    time.sleep(1)

for i in range(10):
    t =threading.Thread(target=func,args=(i,))
    t.start()
方法二

5.鎖  Event(一次放所有)

import time
import threading

lock = threading.Event()


def func(arg):
    print('執行緒來了')
    lock.wait() # 加鎖:紅燈
    print(arg)


for i in range(10):
    t =threading.Thread(target=func,args=(i,))
    t.start()

input(">>>>")
lock.set() # 綠燈


lock.clear() # 再次變紅燈

for i in range(10):
    t =threading.Thread(target=func,args=(i,))
    t.start()

input(">>>>")
lock.set()
Event

總結:

  執行緒安全,列表和字典執行緒安全;   為什麼要加鎖?     - 非執行緒安全     - 控制一段程式碼

二丶threading.local原理

1.threading.local

作用:內部自動為每一個執行緒維護一個空間(字典),用於當前存取的屬於自己的值,保證執行緒之間的資料隔離

{

  執行緒ID:{...}

  執行緒ID:{...}

  執行緒ID:{...}

}

2.threading.loacl 原理

import time
import threading

DATA_DICT = {}

def func(arg):
    ident = threading.get_ident()
    DATA_DICT[ident] = arg
    time.sleep(1)
    print(DATA_DICT[ident],arg)


for i in range(10):
    t =threading.Thread(target=func,args=(i,))
    t.start()
原理一
import time
import threading
INFO = {}
class Local(object):

    def __getattr__(self, item):
        ident = threading.get_ident()
        return INFO[ident][item]

    def __setattr__(self, key, value):
        ident = threading.get_ident()
        if ident in INFO:
            INFO[ident][key] = value
        else:
            INFO[ident] = {key:value}

obj = Local()

def func(arg):
    obj.phone = arg # 呼叫物件的 __setattr__方法(“phone”,1)
    time.sleep(2)
    print(obj.phone,arg)


for i in range(10):
    t =threading.Thread(target=func,args=(i,))
    t.start()
原理二

三丶關於執行緒安全 

你的PC或者筆記本還是單核嗎? 如果是,那你已經out了.

隨著納米技術的不斷進步, 計算機晶片的工藝也在進步,但是已經很難在工藝上的改進來提高 運算速度而滿足 摩爾定理, 所以intel, amd相繼在採用橫向的擴充套件即增加更多的CPU, 從而雙核, 4核, N核不斷推出,於是我們進入了多核時代.

於是一個問題出現了, 多核時代的出現對於我們程式設計師而言意味著什麼, 我們如何利用多核的優勢?

在回答這個問題之前,建議對 程序 和 執行緒 不熟悉的讀者可以先補下相關的知識.

當然方案是,可以採用 多程序, 也可以採用 多執行緒. 二者的最大區別就是, 是否共享資源, 後者是共享資源的,而前者是獨立的. 所以你也可能想起了google chrome為什麼又開始使用獨立的程序 來作為每個tab服務了(不共享資料,意味著有更好的安全性).

相對於程序的輕型特徵,多執行緒環境有個最大的問題就是 如何保證資源競爭,死鎖, 資料修改等.

於是,便有了 執行緒安全 (thread safety)的提出.

執行緒安全
Thread safety is a computer programming concept applicable in the context of multi-threaded programs.
A piece of code is thread-safe if it functions correctly during simultaneous execution by multiple threads.
In particular, it must satisfy the need for multiple threads to access the same shared data,
and the need for a shared piece of data to be accessed by only one thread at any given time.
上面是wikipedia中的解釋, 換句話說, 執行緒安全 是在多執行緒的環境下, 執行緒安全能夠保證多個執行緒同時執行時程式依舊執行正確, 而且要保證對於共享的資料,可以由多個執行緒存取,但是同一時刻只能有一個執行緒進行存取.

既然,多執行緒環境下必須存在資源的競爭,那麼如何才能保證同一時刻只有一個執行緒對共享資源進行存取?

加鎖, 對, 加鎖可以保證存取操作的唯一性, 從而保證同一時刻只有一個執行緒對共享資料存取.

通常加鎖也有2種不同的粒度的鎖:

fine-grained(所謂的細粒度), 那麼程式設計師需要自行地加,解鎖來保證執行緒安全
coarse-grained(所謂的粗粒度), 那麼語言層面本身維護著一個全域性的鎖機制,用來保證執行緒安全
前一種方式比較典型的是 java, Jython 等, 後一種方式比較典型的是 CPython (即Python).

前一種本文不進行討論, 具體可參考 java 中的多執行緒程式設計部分.

至於Python中的全域性鎖機制,也即 GIL (Global Interpreter Lock), 下面主要進行一些討論.

GIL

什麼是 GIL ? 答案可參考wikipedia中的說明, 簡單地說就是:

每一個interpreter程序,只能同時僅有一個執行緒來執行, 獲得相關的鎖, 存取相關的資源.

那麼很容易就會發現,如果一個interpreter程序只能有一個執行緒來執行, 多執行緒的併發則成為不可能, 即使這幾個執行緒之間不存在資源的競爭.

從理論上講,我們要儘可能地使程式更加並行, 能夠充分利用多核的功能, 那麼Python為什麼要使用 全域性的 GIL 來限制這種並行呢?

這個問題,其實已經得到了很多的討論, 不止十年, 可以參考下面的文件:

反對 GIL 的聲音:

An open letter to Guido van Rossum (這個文章值得一看,下面有很多的留言也值得一看)
認為 GIL 不能去除的:

It isn't Easy to Remove the GIL (這個文章來自python作者 Guido, 他說明了什麼要使用 GIL)
其它的一些討論很容易從Google來搜尋得到, 譬如: GIL at google.

那麼,簡單總結下雙方的觀點.

認為應該去除 GIL 的:

不順應計算機的發展潮流(多核時代已經到來, 而 GIL 會很影響多核的使用)
大幅度提升多執行緒程式的速度
認為不應該去除 GIL 的(如果去掉,會):

寫python的擴充套件(module)時會遇到鎖的問題,程式設計師需要繁瑣地加解鎖來保證執行緒安全
會較大幅度地減低單執行緒程式的速度
後者是 Guido 最為關切的, 也是不去除 GIL 最重要的原因, 一個簡單的嘗試是在1999年(十年前), 最終的結果是導致單執行緒的程式速度下降了幾乎2倍.

歸根結底,其實就是多程序與多執行緒的選擇問題, 有一段話比較有意思, 可以參考 http://www.artima.com/forums/flat.jsp?forum=106&thread=214235.

我引用如下:

I actually don't think removing the GIL is a good solution.
But I don't think threads are a good solution, either.
They're too hard to get right, and I say that after spending literally years studying threading in both C++ and Java.
Brian Goetz has taken to saying that no one can get threading right.
引自 Bruce Eckel 對 Guido 的回覆. 而 Bruce Eckel 是何許人, 如果你瞭解 java 或者 C++, 那麼應該不會不知道他.

個人的觀點

那麼,從我自己的角度來看(我沒有太多的多執行緒程式設計經驗), 先不論多執行緒的速度優勢等,我更加喜歡多程序的是:

簡單,無需要人為(或者語言級別)的加解鎖. 想想 java 中的多執行緒程式設計,程式設計師通常會在此處出錯(java程式設計師可以思考下)
安全, 這也是瀏覽器為什麼開始使用多程序的一個原因
依照Python自身的哲學, 簡單 是一個很重要的原則,所以, 使用 GIL 也是很好理解的.

當然你真的需要充分利用多核的速度優勢,此時python可能並非你最佳的選擇,請考慮別的語言吧,如 java, erlang 等.
作者連結:https://www.cnblogs.com/mindsbook/archive/2009/10/15/thread-safety-and-GIL.html
執行緒安全的觀點

 四丶執行緒池

# ######################## 執行緒####################
import time
import threading

def task(arg):
    time.sleep(50)

while True:
    num = input('>>>')
    t = threading.Thread(target=task,args=(num,))
    t.start()
執行緒
######################## 執行緒池 ###################
import time
from concurrent.futures import ThreadPoolExecutor

def task(arg):
    time.sleep(50)

pool = ThreadPoolExecutor(20)
while True:
    num = input('>>>')
    pool.submit(task,num)
執行緒池

 五丶生產者消費模型

   三部件:

      生產者

        佇列:先進先出

          擴充套件:  棧,後進先出

      消費者

  問:生產者,消費者模型解決了什麼問題:解決了不同等待的問題

import time
import queue
import threading
q = queue.Queue() # 執行緒安全

def producer(id):
    """
    生產者
    :return:
    """
    while True:
        time.sleep(2)
        q.put('包子')
        print('廚師%s 生產了一個包子' %id )

for i in range(1,4):
    t = threading.Thread(target=producer,args=(i,))
    t.start()


def consumer(id):
    """
    消費者
    :return:
    """
    while True:
        time.sleep(1)
        v1 = q.get()
        print('顧客 %s 吃了一個包子' % id)

for i in range(1,3):
    t = threading.Thread(target=consumer,args=(i,))
    t.start()

 六丶關於join

Python多執行緒與多程序中join()方法的效果是相同的。

下面僅以多執行緒為例:

首先需要明確幾個概念:

知識點一:當一個程序啟動之後,會預設產生一個主執行緒,因為執行緒是程式執行流的最小單元,當設定多執行緒時,主執行緒會建立多個子執行緒,在python中,預設情況下(其實就是setDaemon(False)),主執行緒執行完自己的任務以後,就退出了,此時子執行緒會繼續執行自己的任務,直到自己的任務結束,例子見下面一。

知識點二:當我們使用setDaemon(True)方法,設定子執行緒為守護執行緒時,主執行緒一旦執行結束,則全部執行緒全部被終止執行,可能出現的情況就是,子執行緒的任務還沒有完全執行結束,就被迫停止,例子見下面二。

知識點三:此時join的作用就凸顯出來了,join所完成的工作就是執行緒同步,即主執行緒任務結束之後,進入阻塞狀態,一直等待其他的子執行緒執行結束之後,主執行緒在終止,例子見下面三。

知識點四:join有一個timeout引數:

    1. 當設定守護執行緒時,含義是主執行緒對於子執行緒等待timeout的時間將會殺死該子執行緒,最後退出程式。所以說,如果有10個子執行緒,全部的等待時間就是每個timeout的累加和。簡單的來說,就是給每個子執行緒一個timeout的時間,讓他去執行,時間一到,不管任務有沒有完成,直接殺死。
    2. 沒有設定守護執行緒時,主執行緒將會等待timeout的累加和這樣的一段時間,時間一到,主執行緒結束,但是並沒有殺死子執行緒,子執行緒依然可以繼續執行,直到子執行緒全部結束,程式退出。

一:Python多執行緒的預設情況

import threading
import time

def run():
    time.sleep(2)
    print('當前執行緒的名字是: ', threading.current_thread().name)
    time.sleep(2)


if __name__ == '__main__':

    start_time = time.time()

    print('這是主執行緒:', threading.current_thread().name)
    thread_list = []
    for i in range(5):
        t = threading.Thread(target=run)
        thread_list.append(t)

    for t in thread_list:
        t.start()

    print('主執行緒結束!' , threading.current_thread().name)
    print('一共用時:', time.time()-start_time)
預設情況

其執行結果如下

關鍵點:

  1. 我們的計時是對主執行緒計時,主執行緒結束,計時隨之結束,打印出主執行緒的用時。
  2. 主執行緒的任務完成之後,主執行緒隨之結束,子執行緒繼續執行自己的任務,直到全部的子執行緒的任務全部結束,程式結束。

二:設定守護執行緒

import threading
import time

def run():

    time.sleep(2)
    print('當前執行緒的名字是: ', threading.current_thread().name)
    time.sleep(2)


if __name__ == '__main__':

    start_time = time.time()

    print('這是主執行緒:', threading.current_thread().name)
    thread_list = []
    for i in range(5):
        t = threading.Thread(target=run)
        thread_list.append(t)

    for t in thread_list:
        t.setDaemon(True)
        t.start()

    print('主執行緒結束了!' , threading.current_thread().name)
    print('一共用時:', time.time()-start_time)
守護執行緒

其執行結果如下,注意請確保setDaemon()在start()之前。

關鍵點:

  1. 非常明顯的看到,主執行緒結束以後,子執行緒還沒有來得及執行,整個程式就退出了。

三:join的作用

import threading
import time

def run():

    time.sleep(2)
    print('當前執行緒的名字是: ', threading.current_thread().name)
    time.sleep(2)


if __name__ == '__main__':

    start_time = time.time()

    print('這是主執行緒:', threading.current_thread().name)
    thread_list = []
    for i in range(5):
        t = threading.Thread(target=run)
        thread_list.append(t)

    for t in thread_list:
        t.setDaemon(True)
        t.start()

    for t in thread_list:
        t.join()

    print('主執行緒結束了!' , threading.current_thread().name)
    print('一共用時:', time.time()-start_time)
join...

其執行結果如下:

關鍵點:

  1. 可以看到,主執行緒一直等待全部的子執行緒結束之後,主執行緒自身才結束,程式退出。

 總結:

 start: 現在準備就緒,等待CPU排程

    setName: 為執行緒設定名稱

    getName: 獲取執行緒名稱

    setDaemon: 設定為後臺執行緒或前臺執行緒(預設),如果是後臺執行緒,主執行緒執行過程中,後臺執行緒也在進行,主執行緒執行完畢後,後臺執行緒不論成功與否,均停止.如果是前臺執行緒,主執行緒執行過程中,前臺執行緒也在進行,主執行緒執行完畢後,等待前臺執行緒也執行完成後,程式停止.

    join: 逐個執行每個執行緒,執行完畢後繼續往下執行,該方法使得多執行緒變得無意義

    run: 執行緒被CPU排程後自動執行執行緒物件的run方法

ps:小補充

在指令碼執行過程中有一個主執行緒,若在主執行緒中建立了子執行緒,當主執行緒結束時根據子執行緒daemon屬性值的不同可能會發生下面的兩種情況之一:

如果某個子執行緒的daemon屬性為False,主執行緒結束時會檢測該子執行緒是否結束,如果該子執行緒還在執行,則主執行緒會等待它完成後再退出;

如果某個子執行緒的daemon屬性為True,主執行緒執行結束時不對這個子執行緒進行檢查而直接退出,同時所有daemon值為True的子執行緒將隨主執行緒一起結束,而不論是否執行完成。

屬性daemon的值預設為False,如果需要修改,必須在呼叫start()方法啟動執行緒之前進行設定。另外要注意的是,上面的描述並不適用於IDLE環境中的互動模式或指令碼執行模式,因為在該環境中的主執行緒只有在退出Python IDLE時才終止。



import threading

import time



#繼承Thread類,建立自定義執行緒類

class mythread(threading.Thread):

    def __init__(self, num, threadname):

        threading.Thread.__init__(self, name=threadname)

        self.num = num



    #重寫run()方法

    def run(self):

        time.sleep(self.num)

        print(self.num)



#建立自定義執行緒類物件,daemon預設為False

t1 = mythread(1, 't1')

t2 = mythread(5, 't2')

#設定執行緒物件t2的daemon屬性為True

t2.daemon = True

print(t1.daemon)

print(t2.daemon)

#啟動執行緒

t1.start()

t2.start()



把上面的程式碼儲存為ThreadDaemon.py檔案,在IDLE環境中執行結果如下圖所示



在命令提示符環境中執行結果如下圖所示。



可以看到,在命令提示符環境中執行該程式時,執行緒t2沒有執行結束就跟隨主執行緒一同結束了,因此並沒有輸出數字5。
Python多執行緒程式設計中daemon屬性的作用