Python佇列與多執行緒及檔案鎖
阿新 • • 發佈:2018-11-14
佇列實現生產-多執行緒消費
先看程式碼
# -*- coding: utf-8 -*- import queue import threading mu = threading.Lock() class Producer(threading.Thread): def __init__(self, data_queue, thread_name): super(Producer, self).__init__() self.data_queue = data_queue self.thread_name = thread_namedef run(self): for i in range(30): self.data_queue.put(i) print "完成生產:{}".format(i) class Customer(threading.Thread): def __init__(self, data_queue, thread_name): super(Customer, self).__init__() self.data_queue = data_queue self.thread_name= thread_name def run(self): while True: if mu.acquire(True): try: data = self.data_queue.get(True, 3) print "%s完成消費: %s" % (self.thread_name, data) with open('ax.txt', 'a+') as f: f.write('customer_'+str(data)+'\n') except: self.data_queue.task_done() break mu.release() q = queue.Queue() producer = Producer(q, 'Pro') producer.start() producer.join() threads = [] for i in range(1, 5): name = 'Cus{}'.format(i) customer = Customer(q, name) customer.start() threads.append(customer) for j in threads: j.join()
Python佇列使用的是queue模組,多執行緒使用的是threading模組
生產者:Producer類,不斷的向佇列中新增元素,這裡是新增數字1-30.
消費者:Customer類,建立4個執行緒,然後不斷的從佇列中取出元素進行“消費”。
這裡有兩個注重點:
1)寫操作,因為這裡是要寫入檔案的,所以,如果不加鎖的話,就會出現順序混亂,可能會覆蓋資料的情況。
對此,可以先建立一個鎖,進行寫操作時就加上鎖,完成後釋放鎖
# 建立鎖 mu = threading.Lock() # 加鎖 mu.acquire() # 釋放鎖 mu.release()
2) 執行緒退出,當佇列為空時,我想要退出執行緒結束。如果不做點東西(超時設定),執行緒就會一直想要從佇列獲取元素而阻塞,不會退出。
所以,從佇列獲取元素時設定超時時間,超時後會引發異常。上面程式碼 self.data_queue.get(True, 3) 設定了超時時間為3秒,當佇列為空後超過3秒,就會引發異常,執行
self.data_queue.task_done(),task_done()函式會給執行緒傳送訊號,後續的 join() 函式才會生效,退出執行緒。