1. 程式人生 > >flask之分析執行緒和協程

flask之分析執行緒和協程

flask之分析執行緒和協程

01 思考:每個請求之間的關係

我們每一個請求進來的時候都開一個程序肯定不合理,那麼如果每一個請求進來都是序列的,那麼根本實現不了併發,所以我們假定每一個請求進來使用的是執行緒。

那麼執行緒中資料互相不隔離,存在修改資料的時候資料不安全的問題。

假定我們的需求是,每個執行緒都要設定值,並且該執行緒列印該執行緒修改的值。

from threading import Thread,current_thread
import time

class Foo(object):
    def __init__(self):
        self.name = 0

locals_values = Foo()

def func(num):
    locals_values.name = num
    time.sleep(2)             # 取出該執行緒的名字
    print(locals_values.name, current_thread().name)

for i in range(10):
                                    # 設定該執行緒的名字
    t = Thread(target=func,args=(i,),name='執行緒%s'%i)
    t.start()

很明顯阻塞了2秒的時間所有的執行緒都完成了修改值,而2秒後所有的執行緒打印出來的時候都是9了,就產生了資料不安全的問題。

所以我們要解決這種執行緒不安全的問題,有如下兩種解決方案。

  • 方案一:是加鎖

  • 方案二:使用threading.local物件把要修改的資料複製一份,使得每個資料互不影響。

    我們要實現的併發是多個請求實現併發,而不是純粹的只是修改一個數據,所以第二種思路更適合做我們每個請求的併發,把每個請求物件的內容都複製一份讓其互相不影響。

    詳解:為什麼不用加鎖的思路?加鎖的思路是多個執行緒要真正實現共用一個數據,並且該執行緒修改了資料之後會影響到其他執行緒,更適合類似於12306搶票的應用場景,而我們是要做請求物件的併發,想要實現的是該執行緒對於請求物件這部分內容有任何修改並不影響其他執行緒。所以使用方案二

02 threading.local

多個執行緒修改同一個資料,複製多份資料給每個執行緒用,為每個執行緒開闢一塊空間進行資料儲存

例項:

from threading import Thread,current_thread,local
import time

locals_values = local()
# 可以簡單理解為,識別到新的執行緒的時候,都會開闢一片新的記憶體空間,相當於每個執行緒對該值進行了拷貝。

def func(num):
    locals_values.name = num
    time.sleep(2)
    print(locals_values.name, current_thread().name)

for i in range(10):
    t = Thread(target=func,args=(i,),name='執行緒%s'%i)
    t.start()

如上通過threading.local例項化的物件,實現了多執行緒修改同一個資料,每個執行緒都複製了一份資料,並且修改的也都是自己的資料。達到了我們想要的效果。

03 通過字典自定義threading.local

例項:

from threading import get_ident,Thread,current_thread
# get_ident()可以獲取每個執行緒的唯一標記,
import time

class Local(object):
    storage = {}# 初始化一個字典
    get_ident = get_ident # 拿到get_ident的地址
    def set(self,k,v):
        ident =self.get_ident()# 獲取當前執行緒的唯一標記
        origin = self.storage.get(ident)
        if not origin:
            origin={}
        origin[k] = v
        self.storage[ident] = origin
    def get(self,k):
        ident = self.get_ident() # 獲取當前執行緒的唯一標記
        v= self.storage[ident].get(k)
        return v

locals_values = Local()
def func(num):
    # get_ident() 獲取當前執行緒的唯一標記
    locals_values.set('KEY',num)
    time.sleep(2)
    print(locals_values.get('KEY'),current_thread().name)

for i in range(10):
    t = Thread(target=func,args=(i,),name='執行緒%s'%i)
    t.start()

講解:

利用get_ident()獲取每個執行緒的唯一標記作為鍵,然後組織一個字典storage。

如:{執行緒1的唯一標記:{k:v},執行緒2的唯一標記:{k:v}.......}

 {
    15088: {'KEY': 0}, 
    8856: {'KEY': 1},
    17052: {'KEY': 2}, 
    8836: {'KEY': 3}, 
    13832: {'KEY': 4}, 
    15504: {'KEY': 5}, 
    16588: {'KEY': 6}, 
    5164: {'KEY': 7}, 
    560: {'KEY': 8}, 
    1812: {'KEY': 9}
                    }

執行效果:

04 通過setattr和getattr實現自定義threthreading.local

例項:

from threading import get_ident,Thread,current_thread
# get_ident()可以獲取每個執行緒的唯一標記,
import time

class Local(object):
    storage = {}# 初始化一個字典
    get_ident = get_ident # 拿到get_ident的地址

    def __setattr__(self, k, v):
        ident =self.get_ident()# 獲取當前執行緒的唯一標記
        origin = self.storage.get(ident)
        if not origin:
            origin={}
        origin[k] = v
        self.storage[ident] = origin
    def __getattr__(self, k):
        ident = self.get_ident() # 獲取當前執行緒的唯一標記
        v= self.storage[ident].get(k)
        return v

locals_values = Local()
def func(num):
    # get_ident() 獲取當前執行緒的唯一標記
    locals_values.KEY=num
    time.sleep(2)
    print(locals_values.KEY,current_thread().name)

for i in range(10):
    t = Thread(target=func,args=(i,),name='執行緒%s'%i)
    t.start()

05 每個物件有自己的儲存空間(字典)

我們可以自定義實現了threading.local的功能,但是現在存在一個問題,如果我們想生成多個Local物件,但是會導致多個Local物件所管理的執行緒設定的內容都放到了類屬性storage = {}裡面,所以我們如果想實現每一個Local物件所對應的執行緒設定的內容都放到自己的storage裡面,就需要重新設計程式碼。

例項:

from threading import get_ident,Thread,current_thread
# get_ident()可以獲取每個執行緒的唯一標記,
import time

class Local(object):
    def __init__(self):
        # 千萬不要按照註釋裡這麼寫,否則會造成遞迴死迴圈,死迴圈在__getattr__中,不理解的話可以全程使用debug測試。
        # self.storage = {}
        # self.get_ident =get_ident
        object.__setattr__(self,"storage",{})
        object.__setattr__(self,"get_ident",get_ident) # 借用父類設定物件的屬性,避免遞迴死迴圈。

    def __setattr__(self, k, v):
        ident =self.get_ident() # 獲取當前執行緒的唯一標記
        origin = self.storage.get(ident)
        if not origin:
            origin={}
        origin[k] = v
        self.storage[ident] = origin
    def __getattr__(self, k):
        ident = self.get_ident() # 獲取當前執行緒的唯一標記
        v= self.storage[ident].get(k)
        return v

locals_values = Local()
locals_values2 = Local()
def func(num):
    # get_ident() 獲取當前執行緒的唯一標記
    # locals_values.set('KEY',num)
    locals_values.KEY=num
    time.sleep(2)
    print(locals_values.KEY,current_thread().name)
    # print('locals_values2.storage:',locals_values2.storage) # 檢視locals_values2.storage的私有的storage

for i in range(10):
    t = Thread(target=func,args=(i,),name='執行緒%s'%i)
    t.start()

顯示效果我就不做演示了,和前幾個案例演示效果一樣。

06 如果是你會如何設計flask的請求併發?

  • 情況一:單程序單執行緒,基於全域性變數就可以做

  • 情況二:單程序多執行緒,基於threading.local物件做

  • 情況三:單程序多執行緒多協程,如何做?

    提示:協程屬於應用級別的,協程會替代作業系統自動切換遇到 IO的任務或者執行級別低的任務,而應用級別的切換速度遠高於作業系統的切換

    當然如果是自己來設計框架,為了提升程式的併發效能,一定是上訴的情況三,不光考慮多執行緒並且要多協程,那麼該如何設計呢?

    在我們的flask中為了這種併發需求,依賴於底層的werkzeug外部包,werkzeug實現了保證多執行緒和多協程的安全,werkzeug基本的設計理念和上一個案例一致,唯一的區別就是在匯入的時候做了一步處理,且看werkzeug原始碼。

    werkzeug.local.py部分原始碼

    ...
    
    try:
        from greenlet import getcurrent as get_ident # 拿到攜程的唯一標識
    except ImportError:
        try:
            from thread import get_ident #執行緒的唯一標識
        except ImportError:
            from _thread import get_ident
    
    class Local(object):
        ...
    
        def __init__(self):
            object.__setattr__(self, '__storage__', {})
            object.__setattr__(self, '__ident_func__', get_ident)
    
          ...
    
        def __getattr__(self, name):
            try:
                return self.__storage__[self.__ident_func__()][name]
            except KeyError:
                raise AttributeError(name)
    
        def __setattr__(self, name, value):
            ident = self.__ident_func__()
            storage = self.__storage__
            try:
                storage[ident][name] = value
            except KeyError:
                storage[ident] = {name: value}

    分析:

    原理就是在最開始匯入執行緒和協程的唯一標識的時候統一命名為get_ident,並且先匯入協程模組的時候如果報錯說明不支援協程,就會去匯入執行緒的get_ident,這樣無論是隻有執行緒執行還是協程執行都可以獲取唯一標識,並且把這個標識的執行緒或協程需要設定的內容都分類存放於__storage__字典中。