1. 程式人生 > >Python之路(第三十八篇) 併發程式設計:程序同步鎖/互斥鎖、訊號量、事件、佇列、生產者消費者模型

Python之路(第三十八篇) 併發程式設計:程序同步鎖/互斥鎖、訊號量、事件、佇列、生產者消費者模型

一、程序鎖(同步鎖/互斥鎖)

程序之間資料不共享,但是共享同一套檔案系統,所以訪問同一個檔案,或同一個列印終端,是沒有問題的,

而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。

例子

  #併發執行,效率高,但競爭同一列印終端,帶來了列印錯亂
  from multiprocessing import Process
  import os,time
  def work():
      print('%s is running' %os.getpid())
      time.sleep(2)
      print('%s is done' %os.getpid())
  ​
  if __name__ == '__main__':
      for i in range(3):
          p=Process(target=work)
          p.start()

  

 

加鎖後

  
  #加鎖後由併發變成了序列,犧牲了執行效率,但避免了競爭
  ​
  from multiprocessing import Process,Lock
  import os,time
  def work(mutex):
      mutex.acquire() #開始加鎖
      print('%s is running' %os.getpid())
      time.sleep(2)
      print('%s is done' %os.getpid())
      mutex.release() #釋放鎖,在加鎖期間別的程序都要等
  ​
  if __name__ == '__main__':
      mutex = Lock()
      for i in range(3):
          p=Process(target=work,args=(mutex,))
          p.start()

  

 

例子2

多個程序共享同一檔案

檔案當資料庫,模擬搶票

 

未加鎖版

  
  #檔案db.txt的內容為:{"count":1}
  #注意一定要用雙引號,不然json無法識別
  ​
  # 併發執行,效率高,但競爭寫同一檔案,資料寫入錯亂
  from multiprocessing import Process,Lock
  import time,json,random,os
  def search():
      dic=json.load(open('db.txt'))
      print('\033[43m剩餘票數%s\033[0m' %dic['count'])
  ​
  def get():
      dic=json.load(open('db.txt'))
      time.sleep(0.1) #模擬讀資料的網路延遲
      if dic['count'] >0:
          dic['count']-=1
          time.sleep(0.2) #模擬寫資料的網路延遲
          json.dump(dic,open('db.txt','w'))
          print('%s\033[43m購票成功\033[0m'%(os.getpid()))
  ​
  def task(lock):
      search()
      get()
  if __name__ == '__main__':
      lock=Lock()
      for i in range(10): #模擬併發10個客戶端搶票
          p=Process(target=task,args=(lock,))
          p.start()

  

輸出結果

  
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  4120購票成功
  2692購票成功
  7328購票成功
  13444購票成功
  13632購票成功
  13560購票成功
  13752購票成功
  12564購票成功
  13720購票成功
  13488購票成功

  

加鎖版

 

  
  import multiprocessing,time,json,random
  ​
  def search(name):
      with open("db.txt","r",encoding="utf-8") as f:
          data_dic = json.load(f)
          time.sleep(random.uniform(0,2))
          if data_dic["count"] >= 1 :
              print("已查詢到票還有%s張,當前系統時間 %s"%(data_dic["count"],time.asctime()))
          else:
              print("系統票源不足!當前系統時間 %s"%time.asctime())
  ​
  def buy(name):
      with open("db.txt","r+",encoding="utf-8") as f:
          data_dic = json.load(f)
      if data_dic["count"] > 0 :
          with open("db.txt", "w", encoding="utf-8") as g:
              new_ticket_count = data_dic["count"] - 1
              data_dic.update({"count":new_ticket_count})
              json.dump(data_dic,g)
          print("%s購票成功!"%name)
      else:
          print("%s購票失敗!"%name)
  ​
  def task(name,mutex):
      search(name)  # 查詢無需加鎖
      mutex.acquire()
      buy(name)  #針對修改檔案的關鍵操作加鎖
      mutex.release()
  ​
  ​
  if __name__ == "__main__":
      mutex = multiprocessing.Lock()
      for i in range(10):
          p = multiprocessing.Process(target=task,args=("乘客%s"%i,mutex))
          p.start()
 

  

分析

  
  #加鎖可以保證多個程序修改同一塊資料時,同一時間只能有一個任務可以進行修改,即序列的修改,沒錯,速度是慢了,但犧牲了速度卻保證了資料安全。
  雖然可以用檔案共享資料實現程序間通訊,但問題是:
  1.效率低(共享資料基於檔案,而檔案是硬碟上的資料)
  2.需要自己加鎖處理
  ​
  ​
  ​
  #因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個程序共享一塊記憶體的資料)2、幫我們處理好鎖問題。這就是mutiprocessing模組為我們提供的基於訊息的IPC通訊機制:佇列和管道。
  1 佇列和管道都是將資料存放於記憶體中
  2 佇列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,
  我們應該儘量避免使用共享資料,儘可能使用訊息傳遞和佇列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。

  

 

二、訊號量(multiprocess.Semaphore)

互斥鎖同時只允許一個執行緒更改資料,而訊號量Semaphore是同時允許一定數量的執行緒更改資料 。實現:訊號量同步基於內部計數器,每呼叫一次acquire(),計數器減1;每呼叫一次release(),計數器加1.當計數器為0時,acquire()呼叫被阻塞。這是迪科斯徹(Dijkstra)訊號量概念P()和V()的Python實現。訊號量同步機制適用於訪問像伺服器這樣的有限資源。訊號量與程序池的概念很像,但是要區分開,訊號量涉及到加鎖的概念。

例子

  
  # 多程序中的元件
  # ktv
  # 4個
  # 一套資源  同一時間 只能被n個人訪問
  # 某一段程式碼 同一時間 只能被n個程序執行
  import time
  import random
  from multiprocessing import Process
  from multiprocessing import Semaphore
  ​
  # sem = Semaphore(4)
  # sem.acquire()
  # print('拿到第一把鑰匙')
  # sem.acquire()
  # print('拿到第二把鑰匙')
  # sem.acquire()
  # print('拿到第三把鑰匙')
  # sem.acquire()
  # print('拿到第四把鑰匙')
  # sem.acquire()
  # print('拿到第五把鑰匙')
  def ktv(i,sem):
      sem.acquire()    #獲取鑰匙
      print('%s走進ktv'%i)
      time.sleep(random.randint(1,5))
      print('%s走出ktv'%i)
      sem.release()
  ​
  ​
  if __name__ == '__main__' :
      sem = Semaphore(4)
      for i in range(20):
          p = Process(target=ktv,args=(i,sem))
          p.start()

  

 

三、事件(multiprocess.Event)

python執行緒的事件用於主執行緒控制其他執行緒的執行,事件主要提供了三個方法 set、wait、clear。

事件處理的機制:全域性定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。

clear:將“Flag”設定為False,set:將“Flag”設定為True.

 

例子

  
  from multiprocessing import Event
  ​
  e = Event()
  print(e.is_set()) #初始設定為False
  print("資料111")
  e.set()  #設定之後為True
  print("資料222")
  print(e.is_set()) #列印設定之後的狀態
  e.wait()  #當值為False會阻塞,當值為Ture是,不會阻塞
  print("資料333")
  e.clear() #清除事件狀態,設定為False
  print(e.is_set())  #列印清除之後的狀態
  print("資料444")
  e.wait()  #此時值為False,程式會一直阻塞
  print("資料555")

  

 

輸出結果

  
  False
  資料111
  資料222
  True
  資料333
  False
  資料444

  

 

例子

簡單的紅綠燈事件

  
  from multiprocessing import Event,Process
  import time
  import random
  ​
  ​
  def cars(e,num):
      if not e.is_set(): # 程序剛開啟,is_set()的值是False,模擬訊號燈為紅色
          print("%s車正在等待通行"%num)
          e.wait() # 阻塞,等待訊號燈切換
      print("%s車已經通過" % num) #列印已經通過的程序
  ​
  ​
  def light(e):
  ​
      #模擬定時切換紅綠燈
      while True:
          if e.is_set():
              e.clear() #>將is_set()的值設定為False
              print("\033[31m紅燈亮了\033[0m")
          else:
              e.set() #>將is_set()的值設定為True
              print("\033[32m綠燈亮了\033[0m")
          time.sleep(2)
  ​
  if __name__ == "__main__":
      e = Event()
      traffic = Process(target=light,args=(e,))
      traffic.start() #啟動紅綠燈程序
      for i in range(20):
          car = Process(target=cars,args=(e,"布加迪%s"%i))
          car.start()
          time.sleep(random.random())

  

 

四、程序間通訊——佇列和管道

程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:佇列和管道,這兩種方式都是使用訊息傳遞的

佇列

 佇列就相當於一個容器,裡面可以放資料,特點是先放進去先拿出來,即先進先出。

建立佇列的類(底層就是以管道和鎖定的方式實現)

  
  Queue([maxsize]):建立共享的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。 

  

引數

  
  maxsize是佇列中允許最大項數,省略則無大小限制。  

  

  方法介紹:

  
  q.put方法用以插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為False,但該Queue已滿,會立即丟擲Queue.Full異常。
  q.get方法可以從佇列讀取並且刪除一個元素。同樣,get方法有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果佇列為空,則立即丟擲Queue.Empty異常.
   
  q.get_nowait():同q.get(False)
  q.put_nowait():同q.put(False)
  ​
  q.empty():呼叫此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果佇列中又加入了專案。
  q.full():呼叫此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果佇列中的專案被取走。
  q.qsize():返回佇列中目前專案的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣

  

其他方法(瞭解):

  
  q.close() 
  關閉佇列,防止佇列中加入更多資料。呼叫此方法時,後臺執行緒將繼續寫入那些已入佇列但尚未寫入的資料,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動呼叫此方法。關閉佇列不會在佇列使用者中生成任何型別的資料結束訊號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的佇列不會導致get()方法返回錯誤。
  ​
  q.cancel_join_thread() 
  不會再程序退出時自動連線後臺執行緒。這可以防止join_thread()方法阻塞。
  ​
  q.join_thread() 
  連線佇列的後臺執行緒。此方法用於在呼叫q.close()方法後,等待所有佇列項被消耗。預設情況下,此方法由不是q的原始建立者的所有程序呼叫。呼叫q.cancel_join_thread()方法可以禁止這種行為。

  

例子

  from multiprocessing import Queue
  ​
  q = Queue(3)  # 建立一個佇列物件,並給他設定容器大小,即能放幾個資料
  q.put(1)  # put()方法是往容器裡放資料
  q.put([2,3])
  q.put({"k1":4})
  # q.put("mi") # 如果佇列已經滿了,程式就會停在這裡,等待資料被別人取走,再將資料放入佇列。
  try:
      q.put_nowait(3) # 可以使用put_nowait,如果佇列滿了不會阻塞,但是會因為佇列滿了而報錯。
  except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去,但是會丟掉這個訊息。
      print('佇列已經滿了')
  ​
  # 因此,我們再放入資料之前,可以先看一下佇列的狀態,如果已經滿了,就不繼續put了。
  print(q.full()) #返回True ,滿了
  print(q.get())  #get()方法是從容器裡拿資料
  print(q.get())
  print(q.get())
  # 同put方法一樣,如果佇列已經空了,那麼繼續取就會出現阻塞。
  try:
      q.get_nowait() # 可以使用get_nowait,如果佇列滿了不會阻塞,但是會因為沒取到值而報錯。
  except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去。
      print('佇列已經空了')
  ​
  print(q.empty()) #空了

  

例子

  
  import time
  from multiprocessing import Queue, Process
  ​
  ​
  def task(q):
      q.put(" hello! 時間%s"%time.asctime())  # 呼叫主函式中p程序傳遞過來的程序引數 put函式為向佇列中新增一條資料。
  ​
  ​
  if __name__ == '__main__':
      q = Queue(3)#建立一個Queue物件
      p = Process(target=task, args=(q,)) #建立一個子程序
      p.start()
      print(q.get()) #在主程序列印從子程序獲取的資料

  


   

 

生產者消費者模型

生產者消費者模型

在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

為什麼要使用生產者消費者模型

生產者指的是生產資料的任務,消費者指的是處理資料的任務,在併發程式設計中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麼是生產者和消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

 

基於佇列實現生產者消費者模型

  
  from multiprocessing import Process, Queue
  import time, random, os
  ​
  ​
  def consumer(q):
      while True:
          res = q.get()
          time.sleep(random.randint(1, 3))
          print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))
  ​
  ​
  def producer(q):
      for i in range(10):
          time.sleep(random.randint(1, 3))
          res = '包子%s' % i
          q.put(res)
          print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))
  ​
  ​
  if __name__ == '__main__':
      q = Queue()
      # 生產者們:即廚師們
      p1 = Process(target=producer, args=(q,))
  ​
      # 消費者們:即吃貨們
      c1 = Process(target=consumer, args=(q,))
  ​
      # 開始
      p1.start()
      c1.start()
      print('主')
 

  

生產者消費者模型總結

  
  #程式中有兩類角色
      一類負責生產資料(生產者)
      一類負責處理資料(消費者)
      
  #引入生產者消費者模型為了解決的問題是:
      平衡生產者與消費者之間的工作能力,從而提高程式整體處理資料的速度
      
  #如何實現:
      生產者<-->佇列<——>消費者
  #生產者消費者模型實現類程式的解耦和

  

此時的問題是主程序永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死迴圈中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往佇列中再發一個結束訊號,這樣消費者在接收到結束訊號後就可以break出死迴圈

  import time, random, os
  from multiprocessing import Process, Queue
  ​
  ​
  def consumer(q):
      while True:
          res = q.get()
          if res is None: break  # 收到結束訊號則結束
          time.sleep(random.randint(1, 3))
          print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))
  ​
  ​
  def producer(q):
      for i in range(10):
          time.sleep(random.randint(1, 3))
          res = '包子%s' % i
          q.put(res)
          print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))
      q.put(None)  # 傳送結束訊號,生產者在生產完畢後傳送結束訊號None
  ​
  ​
  if __name__ == '__main__':
      q = Queue()
      # 生產者們:即廚師們
      p1 = Process(target=producer, args=(q,))
  ​
      # 消費者們:即吃貨們
      c1 = Process(target=consumer, args=(q,))
  ​
      # 開始
      p1.start()
      c1.start()
      print('主')

  

  

 

注意:結束訊號None,不一定要由生產者發,主程序裡同樣可以發,但主程序需要等生產者結束後才應該傳送該訊號。但上述解決方式,在有多個生產者和多個消費者時,需要多次傳送None訊號。

  import multiprocessing
  import time
  import random
  ​
  ​
  def producer(name, q):
      for i in range(2):
          res = "包子%s" % i
          time.sleep(random.randint(0, 1))
          print("%s生產了%s" % (name, res))
          q.put(res)
  ​
  ​
  def consumer(name, q):
      while True:
          res = q.get()
          if q.get() is None:  # 收到結束訊號則結束
              print("沒包子吃了")
              break
          print("%s吃了%s" % (name, res))
  ​
  ​
  if __name__ == "__main__":
      q = multiprocessing.Queue()
      p1 = multiprocessing.Process(target=producer, args=("jack", q))
      p2 = multiprocessing.Process(target=producer, args=("charles", q))
      p3 = multiprocessing.Process(target=producer, args=("pony", q))
      c1 = multiprocessing.Process(target=consumer, args=("nick", q))
      c2 = multiprocessing.Process(target=consumer, args=("nicholas", q))
      p_list = []
      p_list.append(p1)
      p_list.append(p2)
      p_list.append(p3)
      for p in p_list:
          p.start()
      c1.start()
      c2.start()
      p1.join() #必須保證生產者全部生產完畢,才應該傳送結束訊號
      p2.join()
      p3.join()
      q.put(None)  # 傳送結束訊號,有幾個消費者就應該傳送幾次結束訊號None
      q.put(None)  # 傳送結束訊號
      print("end........")
 

  

  

這裡有另外一種佇列提供了這種機制,JoinableQueue。

JoinableQueue([maxsize])

其實就是一種佇列,但又比佇列要多兩種方法,task_done()和join()方法,正是有這兩種方法就可以解決上面的問題。

建立可連線的共享程序佇列。這就像是一個Queue物件,但佇列允許專案的使用者通知生產者專案已經被成功處理。通知程序是使用共享的訊號和條件變數來實現的。

 

方法介紹

  
  JoinableQueue的例項p除了與Queue物件相同的方法之外,還具有以下方法:
  ​
  q.task_done() 
  使用者使用此方法發出訊號,表示q.get()返回的專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除的專案數量,將引發ValueError異常。
  ​
  q.join() 
  生產者將使用此方法進行阻塞,直到佇列中所有專案均被處理。阻塞將持續到為佇列中的每個專案均呼叫q.task_done()方法為止。 
  下面的例子說明如何建立永遠執行的程序,使用和處理佇列上的專案。生產者將專案放入佇列,並等待它們被處理。

  

例子

import multiprocessing
import time
import random


def producer(name, q):
    for i in range(2):
        res = "包子%s" % i
        time.sleep(random.randint(0, 1))
        print("%s生產了%s" % (name, res))
        q.put(res)
    q.join()  # 只有顧客把佇列的包子全部拿走後,三個生產者程序才能全部結束


def consumer(name, q):
    while True:
        res = q.get()
        print("%s吃了%s" % (name, res))
        q.task_done()  # 發訊號告訴佇列,又吃完了一個,從佇列中取走一個數據並處理完成


if __name__ == "__main__":
    # q = multiprocessing.Queue()
    q = multiprocessing.JoinableQueue()
    p1 = multiprocessing.Process(target=producer, args=("jack", q))
    p2 = multiprocessing.Process(target=producer, args=("charles", q))
    p3 = multiprocessing.Process(target=producer, args=("pony", q))
    c1 = multiprocessing.Process(target=consumer, args=("nick", q))
    c2 = multiprocessing.Process(target=consumer, args=("nicholas", q))
    p_list = []
    p_list.append(p1)
    p_list.append(p2)
    p_list.append(p3)
    for p in p_list:
        p.start()
    c1.daemon = True  # 將c1\c2設定成守護程序,只要主程序結束了,那麼顧客就收到了所有的資料
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()

    print("end........")
# 主程序等--->p1,p2,p3等---->c1,c2
# p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到佇列的資料
# 因而c1,c2也沒有存在的價值了,不需要繼續阻塞在程序中影響主程序了。
# 應該隨著主程序的結束而結束,所以設定成守護程序就可以了。