1. 程式人生 > >併發並行,同步非同步,同步鎖,遞迴鎖,同步條件(event),訊號量(Semaphore),佇列(queue),生產者消費者

併發並行,同步非同步,同步鎖,遞迴鎖,同步條件(event),訊號量(Semaphore),佇列(queue),生產者消費者

併發&並行

併發:是指系統具有處理 多個任務(動作)的能力(分著切換進行)。一個cpu就能實現併發,一邊聽歌一邊打遊戲

並行:是指系統具有同時處理 多個任務(唯一的時刻,同一時刻)。多核(4核處理4個任務)

並行是併發的子集

同步&非同步

同步:當程序執行到一個I/O操作的時候(等待外部資料的時候)你,------等:同步(你打手機電話,你要一直等接聽,途中沒網)

非同步:                                                                                              ,--------不等:可以做其他事,一直等到資料接收成功,再回來處理(打QQ電話可以刷微博)

同步鎖:控制執行緒只能有一個進入cpu

注:time.sleep()相當於執行一次I/O操作

多個執行緒都在同時操作同一個共享資源,所以造成了資源破壞,我們可以通過同步鎖來解決這種問題

import time
import threading

def addNum():
    global num #在每個執行緒中都獲取這個全域性變數
    #num-=1

    temp=num
    #print('--get num:',num )
    time.sleep(0.1)
    num =temp-1 #對此公共變數進行-1操作

num = 100  #設定一個共享變數
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有執行緒執行完畢
    t.join()

print('final num:', num )

ime.sleep(0.1)  /0.001/0.0000001 結果是不一樣的 

import time
import threading

def addNum():
    global num #在每個執行緒中都獲取這個全域性變數
    r.acquire#接受
    #num-=1

    temp=num
    #print('--get num:',num )
    time.sleep(0.1)
    num =temp-1 #對此公共變數進行-1操作

    r.release()#釋放

num = 100  #設定一個共享變數
r=threading.Lock()#例項
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有執行緒執行完畢
    t.join()

print('final num:', num )

同步鎖容易造成執行緒死鎖!線上程間共享多個資源的時候,如果兩個執行緒分別佔有一部分資源並且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所有這兩個執行緒在無外力作用下將一直等待下去。例:

import threading,time

class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        lockB.release()
        lockA.release()

    def doB(self):
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        lockA.release()
        lockB.release()

    def run(self):
        self.doA()
        self.doB()
if __name__=="__main__":

    lockA=threading.Lock()
    lockB=threading.Lock()
    threads=[]
    for i in range(5):
        t=myThread()
        t.start()
        threads.append(t)
   
    for t in threads:
        t.join()    
    print('ending')#容易造成死鎖
import threading,time
#當count>0時,只能一個執行緒執行,其他執行緒進不來
class myThread(threading.Thread):
    def doA(self):
        r_lock.acquire()#count=1
        print(self.name,"gotlockA",time.ctime())
        time.sleep(3)
        r_lock.acquire()#count=2
        print(self.name,"gotlockB",time.ctime())
        r_lock.release()#count=1
        r_lock.release()#count=0


    def doB(self):
        r_lock.acquire()
        print(self.name,"gotlockB",time.ctime())
        time.sleep(2)
        r_lock.acquire()
        print(self.name,"gotlockA",time.ctime())
        r_lock.release()
        r_lock.release()

    def run(self):
        self.doA()
        self.doB()
if __name__=="__main__":

   # lockA=threading.Lock()
    #lockB=threading.Lock()
    threads=[]
    r_lock=threading.RLock()#就單單建立一個特別的鎖threading.RLock()
    for i in range(5):
        t=myThread()
        t.start()
        threads.append(t)
   
    for t in threads:
        t.join()    
    print('ending')

為了支援在同一執行緒中多次請求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。

An event is a simple synchronization object;the event represents an internal flag,

and threads can wait for the flag to be set, or set or clear the flag themselves.

event = threading.Event() # a client thread can wait for the flag to be set event.wait() # a server thread can set or reset it event.set() event.clear()

If the flag is set, the wait method doesn’t do anything. If the flag is cleared, wait will block until it becomes set again. Any number of threads may wait for the same event.

import threading,time
class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚大家都要加班到22:00。")
        print(event.isSet())
        event.set()#set()設立後wait()就不起任何作用了
        time.sleep(5)
        print("BOSS:<22:00>可以下班了。")
        print(event.isSet())
        event.set()
class Worker(threading.Thread):
    def run(self):
        event.wait()
        print("Worker:哎……命苦啊!")
        time.sleep(1)
        event.clear()#clear()後wait()又起作用,等待set()才執行
        event.wait()
        print("Worker:OhYeah!")
if __name__=="__main__":
    event=threading.Event()
    threads=[]
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

訊號量:可以指定若干兩執行緒進入cpu(同步)

訊號量用來控制執行緒併發數的,BoundedSemaphore或Semaphore管理一個內建的計數器,每當呼叫acquire()時-1,呼叫release()時+1。計數器不能小於0,當計數器為 0時,acquire()將阻塞執行緒至同步鎖定狀態,直到其他執行緒呼叫release()。(類似於停車位的概念)BoundedSemaphore與Semaphore的唯一區別在於前者將在呼叫release()時檢查計數器的值是否超過了計數器的初始值,如果超過了將丟擲一個異常。


import threading,time
class myThread(threading.Thread):
    def run(self):#最後結果就是5個一輸出睡5秒然後再5個一輸出
        if semaphore.acquire():
            print(self.name)
            time.sleep(5)
            semaphore.release()
if __name__=="__main__":
    semaphore=threading.Semaphore(5)#設定訊號量為5
    thrs=[]
    for i in range(100):
        thrs.append(myThread())
    for t in thrs:
        t.start()

佇列:queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

建立一個“佇列”物件 import Queue q = Queue.Queue(maxsize = 10) Queue.Queue類即是一個佇列的同步實現。佇列長度可為無限或者有限。可通過Queue的建構函式的可選引數maxsize來設定佇列長度。如果maxsize小於1就表示佇列長度無限。

將一個值放入佇列中 q.put(10) 呼叫佇列物件的put()方法在隊尾插入一個專案。put()有兩個引數,第一個item為必需的,為插入專案的值;第二個block為可選引數,預設為 1。如果隊列當前為空且block為1,put()方法就使呼叫執行緒暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。

將一個值從佇列中取出 q.get() 呼叫佇列物件的get()方法從隊頭刪除並返回一個專案。可選引數為block,預設為True。如果佇列為空且block為True, get()就使呼叫執行緒暫停,直至有專案可用。如果佇列為空且block為False,佇列將引發Empty異常。

Python Queue模組有三種佇列及建構函式: 1、Python Queue模組的FIFO佇列先進先出。   class queue.Queue(maxsize) 2、LIFO類似於堆,即先進後出。               class queue.LifoQueue(maxsize) 3、還有一種是優先順序佇列級別越低越先出來。        class queue.PriorityQueue(maxsize)

#優先順序
# q=queue.PriorityQueue()
# q.put([5,100])
# q.put([7,200])
# q.put([3,"hello"])
# q.put([4,{"name":"alex"}])

# while 1:
#     data=q.get()
      print(data)#---->[3,"hello"]
#     print(data[1])----->["hello"]
#     print("----------")

此包中的常用方法(q = Queue.Queue()): q.qsize() 返回佇列的大小 q.empty() 如果佇列為空,返回True,反之False q.full() 如果佇列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取佇列,timeout等待時間 q.get_nowait() 相當q.get(False) 非阻塞 q.put(item) 寫入佇列,timeout等待時間 q.put_nowait(item) 相當q.put(item, False) q.task_done() 在完成一項工作之後,q.task_done() 函式向任務已經完成的佇列傳送一個訊號 q.join() 實際上意味著等到佇列為空,再執行別的操作

q.task_done()與q.join()應同時出現,你發我收,然後join()收到訊號後繼續接下來的事

生產者消費者模型:

為什麼要使用生產者和消費者模式

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

這就像,在餐廳,廚師做好菜,不需要直接和客戶交流,而是交給前臺,而客戶去飯菜也不需要不找廚師,直接去前臺領取即可,這也是一個結耦的過程。

中間介質:佇列

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(5)
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    #q.task_done()
    q.join()
    print("ok......")

def Consumer(name):
  count = 0
  while count <10:
        time.sleep(random.randrange(4))
    # if not q.empty():
    #     print("waiting.....")
        #q.join()
        data = q.get()
        print("eating....")
        time.sleep(4)

        q.task_done()
        #print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    # else:
    #     print("-----no baozi anymore----")
        count +=1

p1 = threading.Thread(target=Producer, args=('A君',))
c1 = threading.Thread(target=Consumer, args=('B君',))
c2 = threading.Thread(target=Consumer, args=('C君',))
c3 = threading.Thread(target=Consumer, args=('D君',))

p1.start()
c1.start()
c2.start()
c3.start()