1. 程式人生 > >python 學習第二十四天(同步物件、資訊量、queue庫)

python 學習第二十四天(同步物件、資訊量、queue庫)

  • 同步物件
import threading,time
class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚大家都要加班到22:00。")
        print(event.isSet())
        event.set()# set flag
        time.sleep(5)
        print("BOSS:<22:00>可以下班了。")
        print(event.isSet())
        event.set()
class
Worker(threading.Thread):
def run(self): event.wait() #等待event的flag被set,才執行下面的程式碼 print("Worker:哎……命苦啊!") time.sleep(1) event.clear()#還原到flag沒有被set的狀態 event.wait() print("Worker:OhYeah!") if __name__=="__main__": event=threading.Event()#建立一個同步物件
threads=[] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join()
  • 資訊量
    訊號量用來控制執行緒併發數的,BoundedSemaphore或Semaphore管理一個內建的計數 器,每當呼叫acquire()時-1,呼叫release()時+1。計數器不能小於0,當計數器為 0時,acquire()將阻塞執行緒至同步鎖定狀態,直到其他執行緒呼叫release()。(類似於停車位的概念)BoundedSemaphore與Semaphore的唯一區別在於前者將在呼叫release()時檢查計數器的值是否超過了計數器的初始值,如果超過了將丟擲一個異常。
import threading,time
class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(5)
            semaphore.release()
if __name__=="__main__":
    semaphore=threading.Semaphore(5)
    thrs=[]
    for i in range(100):
        thrs.append(myThread())
    for t in thrs:
        t.start()
  • 佇列
建立一個“佇列”物件
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類即是一個佇列的同步實現。佇列長度可為無限或者有限。可通過Queue的建構函式的可選引數maxsize來設定佇列長度。如果maxsize小於1就表示佇列長度無限。

將一個值放入佇列中
q.put(10)
呼叫佇列物件的put()方法在隊尾插入一個專案。put()有兩個引數,第一個item為必需的,為插入專案的值;第二個block為可選引數,預設為
1。如果隊列當前為空且block為1,put()方法就使呼叫執行緒暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。

將一個值從佇列中取出
q.get()
呼叫佇列物件的get()方法從隊頭刪除並返回一個專案。可選引數為block,預設為True。如果佇列為空且block為True,
get()就使呼叫執行緒暫停,直至有專案可用。如果佇列為空且block為False,佇列將引發Empty異常。

Python Queue模組有三種佇列及建構函式:
1、Python Queue模組的FIFO佇列先進先出。   class queue.Queue(maxsize)
2、LIFO類似於堆,即先進後出。               class queue.LifoQueue(maxsize)
3、還有一種是優先順序佇列級別越低越先出來。        class queue.PriorityQueue(maxsize)

此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回佇列的大小
q.empty() 如果佇列為空,返回True,反之False
q.full() 如果佇列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取佇列,timeout等待時間
q.get_nowait() 相當q.get(False)
非阻塞 q.put(item) 寫入佇列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之後,q.task_done() 函式向任務已經完成的佇列傳送一個訊號。
q.join() 實際上意味著等到佇列為空或者收到task_done()傳送的訊號,再執行別的操作
複製程式碼

import queue

#先進後出

q=queue.LifoQueue()

q.put(34)
q.put(56)
q.put(12)

#優先順序
# q=queue.PriorityQueue()
# q.put([5,100])
# q.put([7,200])
# q.put([3,"hello"])
# q.put([4,{"name":"alex"}])

while 1:

  data=q.get()
  print(data)
  • 生產者消費者模型
import threading,time,queue,random
q=queue.Queue(maxsize=10)
count=1

def producer(name):
    global count
    while count<41:

        time.sleep(random.randint(3,5))
        print('%s生成了%d號包子\n'%(name,count))
        q.put(count)
        q.task_done()# 可以把這行註釋掉,看看是什麼情況
        count+=1


def client(name):
    count=1
    n=random.randint(4,10)
    while count<=n:
        q.join()
        i=q.get()
        time.sleep(1)
        print('%s:吃了%d號包子'%(name,i))
        print('這是%s吃的第%d個包子\n'%(name,count))
        count+=1




p1=threading.Thread(target=producer,args= ('A'))
p2=threading.Thread(target=producer,args=('B',))


p1.start()
p2.start()


for i in range(5):
    c = threading.Thread(target=client, args=(''.join(('消費者','(',str(i+1),')')),))
    c.start()