1. 程式人生 > >執行緒佇列資料共享與生產者消費者模型

執行緒佇列資料共享與生產者消費者模型

執行緒佇列資料共享與生產者消費者模型

queue模組簡介

  queue模組提供了多種佇列,那麼它主要是用於多執行緒程式設計中的資料共享。

  我們都知道同一程序下的資料是能被多個執行緒共享的,那麼為什麼這些執行緒在同一程序下還去使用佇列呢?

 

  因為佇列是: 管道 + 鎖

 

  所以使用佇列來存放多個執行緒中用於共享的資料還是為了保證其資料的安全性。

  queue模組中的佇列主要是用於本地測試中使用,正式上線時還得要使用Redis,這個是後話,我們先來看關於queue模組的使用。

       官方中文文件 

 

方法大全

 

queue模組中的佇列方法大全 
方法名稱 功能描述
Queue.qsize() 返回當前佇列的大小
Queue.empty() 判斷當前佇列是否為空
Queue.full() 判斷當前佇列是否已滿
Queue.put(item, block=True, timeout=None) item放入佇列中,block引數為如果要操作的佇列目前已滿是否阻塞,timeout為超時時間。
Queue.put_nowait(item) 相當於 put(item, False),如果操作的佇列已滿則不進行阻塞,而是丟擲Full
異常。
Queue.get(block=True, timeout=None) 將專案從佇列中取出,block引數為如果要操作的佇列目前為空是否阻塞,timeout為超時時間。
Queue.get_nowait() 相當於 get(False),如果要操作的佇列為空則不進行阻塞,而是丟擲Empty異常。
Queue.task_done() 當消費者執行緒被join()阻塞後,生產者執行緒呼叫此方法會停止消費者執行緒的阻塞狀態。詳情請見下面補充
Queue.join() 當消費者執行緒被join()阻塞後,需等待消費者執行緒呼叫task_done()方法後方可停止阻塞。詳情請見下面的補充
注意:如遇到異常情況請檢視官方文件,這個模組的方法比較特殊 

 

三種不同的佇列

  在queue模組中,提供了三種佇列。如下:

 

  queue.Queue:先進先出佇列

  queue.LifoQueue:先進後出佇列

  queue.PriorityQueue:優先順序佇列

 

import queue

# ==== 先進先出佇列 ====

q1 = queue.Queue(maxsize=5)  # 該佇列最大可存放5個專案
q1.put(1)   # 入隊操作
q1.put(2)
q1.put(3)

print(q1.get())  # 出隊操作
print(q1.get())
print(q1.get())

# ==== 執行結果  ====

"""
1
2
3
"""


# ==== 先進後出佇列 ====

q2 = queue.LifoQueue(maxsize=5)  # 該佇列最大可存放5個專案
q2.put(1)   # 入隊操作
q2.put(2)
q2.put(3)

print(q2.get())  # 出隊操作
print(q2.get())
print(q2.get())

# ==== 執行結果  ====

"""
3
2
1
"""

# ==== 優先順序佇列 ====

q3 = queue.PriorityQueue(maxsize=5)
q2.put([10,"優先順序為10"])   # 入隊操作
q2.put([20,"優先順序為20"])
q2.put([30,"優先順序為30"])

print(q2.get())  # 出隊操作,如果想取出具體元素,加個 索引[1] 即可
print(q2.get())
print(q2.get())

# ==== 執行結果  ====

"""
[30, '優先順序為30']
[20, '優先順序為20']
[10, '優先順序為10']
"""
三種佇列的簡單實用

 

生產者消費者模型

  生產者消費者模型是一種思想,大概意思就是我們不讓生成者與消費者之間進行直接接觸,而是通過某種緩衝的方式讓彼此之間的耦合度降低。生產者生產出了產品,消費者就可以進行購買,如果沒有產品就等著。

  那麼這個時候我們就可以使用隊列了,因為佇列裡有阻塞的機制,我畫一幅圖,來看一眼。

來,我們嘗試用程式碼實現一下:

import threading
import queue
import time


def producer():
    """生產者"""
name = "食神周樹人" count = 1 # 計數器 while count < 11: # 每天只做10個包子 produce = "包子" print("包子做好了,這是今天的第{0}個包子".format(count)) q.put(produce + str(count)) # 將產品放在管道中 count += 1 time.sleep(0.4) # 廚師0.4秒做一個包子 def consumer(): """消費者"""
name = threading.current_thread().getName() print("{0}正在等包子".format(name)) while 1: try: food = q.get(timeout=2) # 等拿包子,最多等2秒 time.sleep(0.2) # 消費者0.2秒吃一個包子 print("{0}把{1}吃了,真好吃!".format(name, food)) except queue.Empty: # 有的人等的不耐煩就不等了 print("{0}說:等了這麼久還沒有,不等了".format(name)) return if __name__ == '__main__': q = queue.Queue(maxsize=10) # 一次最多放10個產品 name = ["大耳朵", "二狗", "三蛋子"] # 消費者姓名列表 for i in range(3): t1 = threading.Thread(target=consumer, name=name[i]) t1.start() producer() # 生產者啟動,目前只有一個生產者,也可以多個 # ==== 執行結果 ==== """ 大耳朵正在等包子 二狗正在等包子 三蛋子正在等包子包子做好了,這是今天的第1個包子 三蛋子把包子1吃了,真好吃! 包子做好了,這是今天的第2個包子 二狗把包子2吃了,真好吃! 包子做好了,這是今天的第3個包子 大耳朵把包子3吃了,真好吃! 包子做好了,這是今天的第4個包子 三蛋子把包子4吃了,真好吃! 包子做好了,這是今天的第5個包子 二狗把包子5吃了,真好吃! 包子做好了,這是今天的第6個包子 大耳朵把包子6吃了,真好吃! 包子做好了,這是今天的第7個包子 三蛋子把包子7吃了,真好吃! 包子做好了,這是今天的第8個包子 二狗把包子8吃了,真好吃! 包子做好了,這是今天的第9個包子 大耳朵把包子9吃了,真好吃! 包子做好了,這是今天的第10個包子 三蛋子把包子10吃了,真好吃! 二狗說:等了這麼久還沒有,不等了 大耳朵說:等了這麼久還沒有,不等了 三蛋子說:等了這麼久還沒有,不等了 """

 

補充:join()與task_done()

import threading
import queue
import time

def task_1():
    print("正在裝東西..")
    time.sleep(3)
    q.put("玫瑰花")  # 正在裝東西
    q.task_done()  # 通知對方可以取了


def task_2():
    q.join() # 阻塞等待通知,接到通知說明佇列裡裡有東西了。
    print("取到了",q.get())  # 取東西


if __name__ == '__main__':

    q = queue.Queue(maxsize=5)

    t1 = threading.Thread(target=task_1,name="小明")
    t2 = threading.Thread(target=task_2,name="小花")

    t1.start()
    t2.start()

# ==== 執行結果 ====

"""
正在裝東西..
取到了 玫瑰花
"""

&n