1. 程式人生 > >python threading queue模塊中join setDaemon及task_done的使用方法及示例

python threading queue模塊中join setDaemon及task_done的使用方法及示例

獨立 pro span put 安全 with 錯誤信息 strong 消費

threading:

t.setDaemon(True) 將線程設置成守護線程,主進行結束後,此線程也會被強制結束。如果線程沒有設置此值,則主線程執行完畢後還會等待此線程執行。

t.join() 線程阻塞,只有當線程運行結束後才會繼續執行後續語句

示例:

#coding: utf-8
import threading
import time

def foo(name):
    time.sleep(2)
    print this is %s \n % (name,)
    
if __name__ == __main__
: mythread = [] for i in range(5): t = threading.Thread(target=foo, args=(i, )) # t.setDaemon(True) t.start() mythread.append(t) # for t in mythread: # t.join() print -- end --

運行結果(註意,print為非線程安全,所以打印內容有時會比較亂):

-- end --

this is 1

this is 2

this is 4

this is 0

this is 3

Process finished with exit code 0

可以看到,主線程和子線程是獨立運行的(最後一行先被打印),主線程運行結束後依舊等待子線程結束。

把上面的t.setDaemon(True)取消註釋,再運行一遍,發現只打印了如下內容:

-- end --

Process finished with exit code 0

即主線程運行結束後,會強制結束掉子線程

我們繼續再把下面的註釋去掉

# for t in mythread:

# t.join()

再運行一遍,輸出如下:

this is 1

this is 0

this is 3

this is 4

this is 2

-- end --

Process finished with exit code 0

註意:end在最後才被輸出,說明在join那裏阻塞了主線程的運行,在等待子線程運行完成。

queue:

q.put(item) 將item放入隊列中。

q.get() 從隊列中取出數據。

q.task_done() 每次從queue中get一個數據之後,當處理好相關問題,最後調用該方法,以提示q.join()是否停止阻塞,讓線程向前執行或者退出;

q.join() 阻塞,直到queue中的數據已經每項已經task_done處理到空。

如果不使用task_done也可以,可以通過q.full() q.empty()等來判斷

q.join()隱藏的問題:

對於生產者-消費者模型,這種阻塞方式是有漏洞的,因為如果queue初始為空,q.join()會直接停止阻塞,繼續執行後續語句。

還有另一種情況,就是生產者生產速度比較慢,而消費者消費速度比較快,也有可能停止阻塞,繼續執行後續語句

如果有多個消費者,沒有生產者,且queue始初化為一定的數據量,則可以正常執行。

示例:

#coding: utf-8
import Queue
import threading
import time

queue = Queue.Queue(maxsize=3)

def produce():
    for i in range(5):
        item = "item" + str(i)
        queue.put(item)
        print "%s produce" % (item, )
        # time.sleep(4)
        
def customer():
    while True:
        time.sleep(2)
        item = queue.get()
        print "process %s finished" % (item, )
        queue.task_done()
        
if __name__ == __main__:
    t = threading.Thread(target=produce)
    t.setDaemon(True)
    t.start()
    for i in range(3):
        c = threading.Thread(target=customer)
        c.setDaemon(True)
        c.start()
    queue.join()
    print -- end --
 

運行輸出如下:

item0 produce

item1 produce

item2 produce

process item0 finished

process item2 finishedprocess item1 finished

item3 produce

item4 produce

process item3 finished

process item4 finished

-- end --

Process finished with exit code 0

可以看到程序正常結束,end字符也在最後在打印出來。

我們把produce函數中註釋掉如下語句,

queue.put(item)

print "%s produce" % (item, )

運行得到:

-- end --

Process finished with exit code 0

即queue為空,join()方法並不阻塞線程。

我們在produce函數中加上sleep,讓生產慢一點,去掉time.sleep(4)前面的註釋,運行得到:

item0 produce

process item0 finished

-- end --

Process finished with exit code 0

有時運行有可能會得到這樣的錯誤信息:

Exception in thread Thread-3 (most likely raised during interpreter shutdown)

報錯信息表明主線程不等待子線程就結束了。

可以發現,其實produce函數中應該還有任務要生成,但因為太慢,在join語句那裏檢測到隊列為已經全部被設置task_done,就會繼續往後執行,這有可能有時並不我們需要的,針對這樣的情況,我們可以在queue.join()前加多一個子句t.join()即可達成目的。

如果我們再把customer函數中的queue.task_done()函數去掉,運行得到

item0 produce

process item0 finished

item1 produce

process item1 finished

item2 produce

process item2 finished

item3 produce

process item3 finished

item4 produce

process item4 finished

註意程序一直沒有結束,而最後一行end語句也沒有出現。因為在join那裏,雖然隊列已經全部為消費完,已經為0,但是由於它不是調用task_done函數而讓其計數為0,所以此時,函數會一直阻塞在這裏。

python threading queue模塊中join setDaemon及task_done的使用方法及示例