1. 程式人生 > >Python全棧開發之並發編程

Python全棧開發之並發編程

lock 作用 繼續 tor int() 進程阻塞 區別 args rmi

No.1 線程

什麽是多任務

就是操作系統可以同時運行多個任務,就是可以一邊用瀏覽器上網,同時又可以聽歌,還能再撩個×××姐,這就是多任務,操作系統會輪流把系統調度到每個核心上去執行

並發和並行

並發是指任務數多余cpu核數,通過操作系統的各種任務調度算法,實現多個任務

並行是指任務數小於cpu核數,即任務同時執行

單線程

import time

def say_hello(i):
    print(‘hello ‘, i)

if __name__ == ‘__main__‘:
    for i in range(5):
        say_hello(i)

多線程

import threading
import time

def say_hello(i):
    print(‘hello ‘, i)

if __name__ == ‘__main__‘:
    for i in range(5):
        t = threading.Thread(target=say_hello,args=(i,))
        t.start() # 當調用start方法時,才會正真的執行線程

主線程會等待子線程

import threading
import time

def say_hello(name):
    for i in range(5):
        print(‘hello ‘, i, name)

if __name__ == ‘__main__‘:
    say_hello(‘主線程‘)
    t1 = threading.Thread(target=say_hello,args=(‘t1‘,))
    t2 = threading.Thread(target=say_hello,args=(‘t2‘,))
    t1.start()
    t2.start()
    print(‘end‘)
# hello  0 主線程
# hello  1 主線程
# hello  2 主線程
# hello  3 主線程
# hello  4 主線程
# hello  0 t1
# hello  1 t1
# hello  2 t1
# hello  3 t1
# hello  4 t1
# hello  0 t2
# hello  1 t2
# hello  2 t2
# hello  3 t2
# hello  4 t2
# end

查看線程數量

import threading
import time

def say_hello(i):
    print(‘hello ‘, i)

if __name__ == ‘__main__‘:
    say_hello(‘主線程‘)
    for i in range(5):
        t = threading.Thread(target=say_hello,args=(i,))
        t.start()
        while True:
            length = len(threading.enumerate())
            print(‘當前運行的線程數為:%d‘ % length)
            if length <= 1:
                break
# hello  主線程
# hello  0
# 當前運行的線程數為:2
# 當前運行的線程數為:1
# hello  1
# 當前運行的線程數為:1
# hello  2
# 當前運行的線程數為:1
# hello  3
# 當前運行的線程數為:1
# hello  4
# 當前運行的線程數為:1

封裝線程

為了讓每個線程的封裝性更加完整,我們通常會創建一個線程類,讓這個線程類繼承自threading.Thread,然後重寫run方法就可以了

import threading
import time
class MyThread(threading.Thread):

    def run(self):
        for i in range(5):
            time.sleep(1)
            print(self.name + str(i))

if __name__ == ‘__main__‘:
    t = MyThread()
    t.start()

Python的threading.Thread類的run方法,用於定義線程的功能函數,可以在我們自己的類中覆蓋該方法,當創建自己的線程類對象後,可以start方法,進行調度

線程的執行順序

線程的執行順序是不確定的,當執行到sleep語句時,線程將會被阻塞,然後線程進入就緒狀態,等待cpu的調度,線程調度自動選擇一個線程執行

No.2 多線程

多線程共享全局變量

import threading
import time

num = 100
def demo1():
    global num
    for i in range(3):
        num -= 1
        print(‘num = ‘,num)

def demo2():
    print(‘num = ‘, num)

if __name__ == ‘__main__‘:
    print(‘線程創建之前num = ‘,num)
    t1 = threading.Thread(target=demo1)
    t1.start()
    time.sleep(1)
    t2 = threading.Thread(target=demo2)
    t2.start()
# 線程創建之前num =  100
# num =  99
# num =  98
# num =  97
# num =  97

在一個進程內的所有線程共享全局變量,能很方便的在多個線程之間共享數據,但是這也帶來一個麻煩,就是線程就全局變量的隨機修改可能會導致多線程之間對於全局變量的的混亂,即線程非安全

import threading
import time

num = 100
def demo1():
    global num
    for i in range(1000000):
        # lock.acquire()
        num += 1
        # lock.release()

def demo2():
    global num

    for i in range(1000000):
        # lock.acquire()
        num += 1
        # lock.release()

if __name__ == ‘__main__‘:
    print(‘線程創建之前num = ‘,num)
    # lock = threading.Lock()
    t1 = threading.Thread(target=demo1)
    t1.start()
    t2 = threading.Thread(target=demo2)
    t2.start()
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    print(‘線程執行完畢num = ‘,num)
# 線程創建之前num =  100
# 線程執行完畢num =  1559954
兩個線程分別對線程自增了10次,結果卻是122,如果多個線程同時對同一個全局變量操作,會出現資源競爭問題,從而數據結果會不正確

同步

對於多線程非安全的問題,可以采用同步的方式來解決,每個線程對數據的修改時,都要先上鎖,處理完成後再解鎖,在上鎖的過程中不允許任何線程打擾,這樣就能保證線程安全性了,數據也不會不正確

互斥鎖

當多個線程幾乎同時修改某一個共享數據的時候,需要進程同步控制,線程同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖,互斥鎖為資源引入一個狀態,鎖定/非鎖定,某個線程要更改共享數據時,此時資源狀態為鎖定,其他線程不能更改,當該線程修改完畢,將資源設置為非鎖定,互斥鎖保證了每次只能由一個線程進入寫入,從而保證了多線程情況下數據的正確性

# 創建鎖
lock = threading.Lock()

# 鎖定
lock.acquire()

# 釋放
lock.release()

使用互斥鎖對兩個線程對同一個全局變量各加1億次

import threading
import time

num = 100
def demo1():
    global num
    for i in range(100000000):
        lock.acquire()
        num += 1
        lock.release()

def demo2():
    global num

    for i in range(100000000):
        lock.acquire()
        num += 1
        lock.release()

if __name__ == ‘__main__‘:
    print(‘線程創建之前num = ‘,num)
    lock = threading.Lock()
    t1 = threading.Thread(target=demo1)
    t1.start()
    t2 = threading.Thread(target=demo2)
    t2.start()
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    print(‘線程執行完畢num = ‘,num)
# 線程創建之前num =  100
# 線程執行完畢num =  200000100

上鎖解鎖過程

當一個線程調用所的acquire方法獲得鎖時,鎖就進入locked狀態,每次只能有一個線程的鎖,如果此時有另外一個線程試圖獲得鎖,那麽此時這個鎖就會進入阻塞狀態,直到擁有鎖的線程調用release解鎖之後,鎖進入unlocked狀態,其他線程就可以獲得鎖了

鎖的優缺點

確保了某段關鍵代碼只能由一個線程完整執行,確保了數據的完整性,阻止了多線程並發,使得包含的鎖的代碼只能以單線程執行,效率就大大降低了,還可能發生死鎖

死鎖

在線程共享多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會形成死鎖

import threading
import time

class MyThread1(threading.Thread):
    def run(self):
        # lockA上鎖
        lockA.acquire()
        # 延時1秒,等待另外那個線程 把lockB上鎖
        print(self.name+‘ A start‘)
        time.sleep(1)
        # 堵塞,因為這個lockB已經被另外的線程搶先上鎖了
        lockB.acquire()
        print(self.name+‘ B end‘)
        lockB.release()
        # lockA解鎖
        lockA.release()

class MyThread2(threading.Thread):
    def run(self):
        # lockB上鎖
        lockB.acquire()
        # 延時1秒,等待另外那個線程 把lockA上鎖
        print(self.name+‘ B start‘)
        time.sleep(1)
        # 堵塞,lockA已經被另外的線程搶先上鎖了
        lockA.acquire()
        print(self.name+‘ A end‘)
        lockA.release()
        # lockB解鎖
        lockB.release()

if __name__ == ‘__main__‘:
    lockA = threading.Lock()
    lockB = threading.Lock()
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()
# Thread-1 A start
# Thread-2 B start

如何避免死鎖

import threading
import time

class MyThread1(threading.Thread):
    def run(self):
        lockA.acquire()
        print(self.name, ‘A‘, ‘start‘)
        time.sleep(1)
        # 如果在規定時間內可以上鎖就返回True,反之。。。
        result = lockB.acquire(timeout=1)
        if result:
            print(self.name, ‘B‘, ‘start‘)
            lockA.release()
            print(self.name, ‘A‘, ‘end‘)
            lockB.release()
            print(self.name, ‘B‘, ‘end‘)
        else:
            lockA.release()

class MyThread2(threading.Thread):
    def run(self):
        lockB.acquire()
        print(self.name, ‘B‘, ‘start‘)
        time.sleep(1)
        lockA.acquire()
        print(self.name, ‘A‘, ‘start‘)
        lockA.release()
        print(self.name, ‘A‘, ‘end‘)
        lockB.release()
        print(self.name, ‘B‘, ‘end‘)

if __name__ == ‘__main__‘:
    lockA = threading.Lock()
    lockB = threading.Lock()
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()

GIL

Python語言和GIL沒有半毛錢關系,僅僅是由於歷史原因在Cpython虛擬機,難以移除GIL

GIL,全局解釋器鎖,每個線程在執行的過程都需要先獲取GIL,保證同一時刻只有一個線程可以執行代碼

線程釋放GIL鎖的情況: 在IO操作等可能會引起阻塞的system call之前,可以暫時釋放GIL,但在執行完畢後,必須重新獲取GIL

Python使用多進程是可以利用多核的CPU資源的

多線程爬取比單線程性能有提升,因為遇到IO阻塞會自動釋放GIL鎖

No.3 進程

什麽是進程?

一個程序在運行期間,代碼和程序運行所需的資源稱為進程

進程的狀態

工作中,任務數往往大於cpu核心數,所以一定有一些任務在執行,另外一部分是處於等待狀態
就緒態,運行的條件已經滿足,等待cpu執行
執行態,cpu正在執行該任務
等待態,等待某些條件滿足

創建進程

from multiprocessing import Process
import time

def run_proc():
    while True:
        print(‘子線程‘)
        time.sleep(1)

if __name__ == ‘__main__‘:
    p = Process(target=run_proc)
    p.start()
    while True:
        print(‘主線程‘)
        time.sleep(1)

pid

from multiprocessing import Process
import time
import os
def run_proc():
    # 獲取當前進程的pid
    print(‘子進程 pid = ‘,os.getpid())

if __name__ == ‘__main__‘:
    print(‘父進程 pid = ‘,os.getpid())
    p = Process(target=run_proc)
    p.start()

Process語法結構

Process([group [, target [, name [, args [, kwargs]]]]])
target 如果傳遞了函數的引用,可以讓子進程執行內部代碼
args 給target指定的函數傳遞參數,元組方式
kwargs 給target指定的函數傳遞命名參數
name 給進程設定一個名字
group 指定進程組
常用方法
start() 啟動子進程
is_alive() 判斷進程是否還在運行
join([timeout]) 是否等待子進程執行結束
terminate() 不管任務是否執行完畢,直接結束

進程不能共享全局變量

from multiprocessing import Process
import time
NUM_LIST = [11,22,33]

def demo1(num_list):
    num_list.append(44)
    print(num_list)

def demo2(num_list):
    num_list.append(55)
    print(num_list)

if __name__ == ‘__main__‘:
    p1 = Process(target=demo1,args=(NUM_LIST,))
    p2 = Process(target=demo2,args=(NUM_LIST,))
    p1.start()
    p2.start()
    print(NUM_LIST)
# [11, 22, 33]
# [11, 22, 33, 44]
# [11, 22, 33, 55]

進程、線程對比

定義

進程,能夠完成多任務,例如,在一臺電腦上登錄多個QQ客戶端

線程,能夠完成多任務,例如,在一臺電腦上和多個妹子聊天

區別

一個程序至少有一個進程,一個進程至少有一個線程,線程的劃分尺度小於進程,是的多線程的並發高於多進程,進程在執行過程中擁有獨立的內存單元,而線程卻是共享的,線程的運行開銷小,但是不安全,進程和它相反,所以我們要根據不同的場景選擇適合的

進程池

當需要創建的線程數量不多時,我們可以直接利用Process動態生成進程,但是當進程數量上百甚至上千時,我們再采用Process創建進程就可以猝死了,此時可以使用pool,初始化pool時,可以指定一個最大進程數,當有新的請求提交到pool時,如果線程池沒有滿,那麽會創建一個新的線程來執行該請求,否則,等待其他進程結束

from multiprocessing import Pool
import time

def func(arg):
  print(arg)
  time.sleep(1)

if __name__ == ‘__main__‘:
  pool = Pool(5)
  for i in range(30):
      pool.apply_async(func=func,args=(i,))
  pool.close() # 所有任務執行完畢
  pool.join() # 等待pool中所有子進程執行完成,必須放在close語句之後

pool函數解析

apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args為傳遞給func的參數列表,kwds為傳遞給func的關鍵字參數列表
close():關閉Pool,使其不再接受新的任務;
terminate():不管任務是否完成,立即終止;
join():主進程阻塞,等待子進程的退出, 必須在close或terminate之後使用;

進程池中的queue

from multiprocessing import Pool,Manager
import time
import random

def write(q):
    for i in [11,22,33]:
        if not q.full():
            q.put(i)
            print(‘put %s to queue‘ %i)
            time.sleep(random.random())

def read(q):
    while True:
        if not q.empty():
            value = q.get()
            print(‘get %s to queue‘ %value)
            time.sleep(random.random())
        else:
            break

if __name__ == ‘__main__‘:
    q = Manager().Queue()
    pool = Pool()
    pool.apply_async(write,args=(q,))
    # time.sleep(1)
    pool.apply_async(read,args=(q,))
    pool.close()
    pool.join()

進程間通信

進程間有時也需要通信,可以使用multiprocessing模塊的Queue實現

初始化Queue()對象時,若括號中沒有指定最大可接收的消息數量,或數量為負值,那麽就代表可接受的消息數量沒有上限
Queue.qsize() 返回當前隊列包含的消息數量
Queue.empty() 如果隊列為空,返回True,反之False
Queue.full() 如果隊列滿了,返回True,反之False
Queue.get([block[, timeout]]) 獲取隊列中的一條消息,然後將其從列隊中移除,block默認值為True,如果block使用默認值,且沒有設置timeout,消息列隊如果為空,此時程序將被阻塞,直到從消息列隊讀到消息為止,如果設置了timeout,則會等待timeout秒,若還沒讀取到任何消息,則出"Queue.Empty"異常,如果block值為False,消息列隊如果為空,則會立刻拋出"Queue.Empty"異常
Queue.get_nowait() 相當Queue.get(False)
Queue.put(item,[block[, timeout]] 將item消息寫入隊列,block默認值為True,如果block使用默認值,且沒有設置timeout,消息列隊如果已經沒有空間可寫入,此時程序將被阻塞,直到從消息列隊騰出空間為止,如果設置了timeout,則會等待timeout秒,若還沒空間,則拋出"Queue.Full"異常,如果block值為False,消息列隊如果沒有空間可寫入,則會立刻拋出"Queue.Full"異常
Queue.put_nowait(item) 相當Queue.put(item, False)

栗子

from multiprocessing import Process,Queue
import time
import random

def write(q):
    for i in [11,22,33]:
        if not q.full():
            q.put(i)
            print(‘put %s to queue‘ %i)
            time.sleep(random.random())

def read(q):
    while True:
        if not q.empty():
            value = q.get()
            print(‘get %s to queue‘ %value)
            time.sleep(random.random())
        else:
            break

if __name__ == ‘__main__‘:
    q = Queue()
    t1 = Process(target=write,args=(q,))
    t2 = Process(target=read,args=(q,))
    t1.start()
    t1.join()
    t2.start()
    t2.join()

No.4 叠代器

叠代是遍歷序列的一種方式,它可以記住序列的遍歷位置,叠代器從第一個元素開始訪問,只能向前,不能後退

可叠代對象

可以通過for...in...這類語句叠代的對象稱為可叠代對象

如何判斷一個對象是否可以叠代

可以使用inistance(obj,Iterable)判斷一個對象是不是iterable對象

可叠代對象的本質

可叠代對象進行叠代的時候,我們發現沒叠代一次,都會返回對象的中的下一條數據,一直往後讀取數據直到數據全部叠代完成,那麽,這個負責記錄數據叠代到的索引的機制叫做叠代器,可叠代對象通過iter方法向我們提供一個叠代器,我們在叠代對象的時候,實際上就是調用該方法獲取了一個叠代器,然後根據叠代器來獲取數據的,也就說,具有iter方法 的對象稱為可叠代對象

from collections import Iterable

class MyInt(int):
    def __iter__(self):
        pass

if __name__ == ‘__main__‘:
    myint = MyInt()
    print(isinstance(myint, Iterable)) # True

iter函數與next函數

可叠代對象通過iter函數獲取叠代器,我們可以對獲取到的叠代器不停的使用next函數來獲取下一條數據,當我們對叠代器使用iter函數就是調用了可叠代對象的iter函數,註意,當我們叠代玩最後一個數據時,再次調用next函數會拋出StopIterable異常

叠代器iterable

當我們對叠代器使用next方法的時候,實際上時調用的next函數(Python2是next函數),所以,想構建一個叠代器,就要實現它的nextiter函數,實現了這兩個函數,就是叠代器

class MyIterator(object):
    """自定義的供上面可叠代對象使用的一個叠代器"""
    def __init__(self):
        self.items = []
        self.current = 0 # current用來記錄當前訪問到的位置
    def add(self,value):
        self.items.append(value)

    def __next__(self):
        if self.current < len(self.items):
            item = self.items[self.current]
            self.current += 1
            return item
        else:
            raise StopIteration

    def __iter__(self):
        return self

if __name__ == ‘__main__‘:
    mi = MyIterator()
    mi.add(1)
    mi.add(2)
    mi.add(3)
    mi.add(4)
    mi.add(5)
    for num in mi:
        print(num)

for...in...本質

本質就是先通過iter獲取叠代器,在通過叠代器不斷的調用next方法,當遇到異常退出

叠代器應用場景

每次返回的數據不是在一個已有的數據集合中讀取,而是程序通過一定的規律計算生成的,也就是說不用將所有要叠代的數據一次存儲下來提供後續讀取,這樣可以節省很大空間

class FibIterator(object):
    def __init__(self, n):
        self.n = n
        self.current = 0
        self.num1 = 0
        self.num2 = 1

    def __next__(self):
        if self.current < self.n:
            num = self.num1
            self.num1, self.num2 = self.num2, self.num1+self.num2
            self.current += 1
            return num
        else:
            raise StopIteration

    def __iter__(self):
        return self

if __name__ == ‘__main__‘:
    fib = FibIterator(10)
    for num in fib:
        print(num, end=" ")

No.5 生成器

生成器

一邊循環一邊計算的機制,稱為生成器,生成器是一種特殊的叠代器

創建生成器

將列表生成式的定界符改成()

G = ( x*2 for x in range(5))
G
<generator object <genexpr> at 0x000001E86BC993B8>

創建生成式和生成器的區別僅在於定界符,L是一個列表,G是一個生成器,我們可以直接打印出列表的每個元素,而對於生成其,我們可以按照叠代器的使用方法來使用

使用yield

def fib(n):
    current = 0
    num1, num2 = 0, 1
    while current < n:
        num = num1
        num1, num2 = num2, num1+num2
        current += 1
        yield num

if __name__ == ‘__main__‘:
    f = fib(10)
    for i in f:
        print(i)

使用了yield關鍵字的函數就是生成器,yield的作用有兩點,一是保存當前運行狀態,暫停執行,掛起生成器,二是將yield後面的表達式的值作為返回值返回,可以使用next函數讓生成器從斷點處繼續執行,喚醒生成器

使用send喚醒

我們除了可以使用next函數來喚醒生成器,還可以使用send函數,使用send還可以在喚醒的同時從間斷點傳入一個附加數據

def fib(n):
    current = 0
    num1, num2 = 0, 1
    while current < n:
        num = num1
        num1, num2 = num2, num1+num2
        current += 1
        yield num

if __name__ == ‘__main__‘:
    f = fib(10)
    print(next(f))
    print(f.send(‘...‘))

No.7 協程

協程概念

協程是什麽

協程是Python中另外一種實現多任務的方式,只不是比比線程更小的執行單元,協程自帶CPU上下文,只要在合適的時機,我們可以把一個協程切換到另一個協程,這要在這個過程中保存或恢復CPU上下文那麽程序還是可以運行的,說到這,小夥伴們是不是想到了上文介紹的yield

協程和線程差異

在實現多任務時, 線程切換從系統層面遠不止保存和恢復 CPU上下文這麽簡單。 操作系統為了程序運行的高效性每個線程都有自己緩存Cache等等數據,操作系統還會幫你做這些數據的恢復操作。 所以線程的切換非常耗性能。但是協程的切換只是單純的操作CPU的上下文,所以一秒鐘切換個上百萬次系統都抗的住

簡單實現協程

import time

def work1():
    while True:
        print("----work1---")
        yield
        time.sleep(0.5)

def work2():
    while True:
        print("----work2---")
        yield
        time.sleep(0.5)

def main():
    w1 = work1()
    w2 = work2()
    while True:
        next(w1)
        next(w2)

if __name__ == "__main__":
    main()

greenlet

安裝greenlet pip3 install greenlet

from greenlet import greenlet
import time

def demo1():
    while True:
        print(‘Demo1 is running‘)
        gl2.switch()
        time.sleep(1)

def demo2():
    while True:
        print(‘Demo2 is running‘)
        gl1.switch()
        time.sleep(1)

if __name__ == ‘__main__‘:
    gl1 = greenlet(demo1)
    gl2 = greenlet(demo2)
    gl1.switch() # 切換到gl1執行

gevent

交替運行

import gevent

def demo():
    for i in range(10):
        print(gevent.getcurrent(),i)

if __name__ == ‘__main__‘:
    g1 = gevent.spawn(demo,)
    g2 = gevent.spawn(demo,)
    g3 = gevent.spawn(demo,)
    g4 = gevent.spawn(demo,)
    g1.join()
    g2.join()
    g3.join()
    g4.join()

自動切換

import gevent

def demo():
    for i in range(10):
        print(gevent.getcurrent(),i)
        gevent.sleep(1)

if __name__ == ‘__main__‘:
    g1 = gevent.spawn(demo,)
    g2 = gevent.spawn(demo,)
    g3 = gevent.spawn(demo,)
    g4 = gevent.spawn(demo,)
    g1.join()
    g2.join()
    g3.join()
    g4.join()

No.7 線程、進程、協程區別

進程是資源分配的單位

線程是操作系統調度的單位

進程切換需要的資源很最大,效率很低

線程切換需要的資源一般,效率一般(當然了在不考慮GIL的情況下)

協程切換任務資源很小,效率高

多進程、多線程根據cpu核數不一樣可能是並行的,但是協程是在一個線程中,所以是並發

Python全棧開發之並發編程