1. 程式人生 > >Python中多執行緒的阻塞問題

Python中多執行緒的阻塞問題

在使用Queue模組+多執行緒模擬生產者+消費者問題時,遇到了一個小問題,現在記錄下來。供可能會遇到類似問題的初學者們參考。

該問題的完整參考程式碼如下。主要實現了以下的功能:在一個執行緒中,開啟生產者模式,生成出來的object會put進一個Queue物件queue中。除此以外,在n個執行緒中(本程式碼中n為5,nfuncs = 5),開啟消費者模式,每一個消費者執行緒會在一個while迴圈中不斷地從queue中消耗一個object(使用get方法),直到生產者生產出的object被全部消耗(本程式碼中設為100個object)。

from random import randint
from time import sleep
from Queue import Queue
from threading import Thread,Lock,currentThread
lock = Lock()

def writes(queue):
    print "producing object for Q...",
    queue.put('xxx', 1)
    print "size now", queue.qsize()

def readQ(queue):
    val = queue.get(1)
    print "consumed object from Q... size now", \
          queue.qsize()
    print currentThread().name

def writer(queue, loops):
    for i in range(loops):
        lock.acquire()
        writes(queue)
        lock.release()
        sleep(0.1)

def read(queue):
    while queue.qsize() >0 :
        sleep(randint(2, 4))
        lock.acquire()
        readQ(queue)
        lock.release()

funcs = [writer, read]
nfuncs = range(5)

def main():
    nloops1 = 100
    q = Queue(1024)

    thread = []
    t = Thread(target = writer, args = (q, nloops1))
    thread.append(t)

    for i in nfuncs:
        t = Thread(target = read, args = (q,))
        thread.append(t)


    for i in range(len(thread)):
        thread[i].start()

    for i in range(len(thread)):
        thread[i].join()

    print "All Done"

if __name__ == '__main__':
    main()
為了防止生產速度跟不上消費的速度,生產執行緒中每次生產僅間隔0.1秒,且在消費執行緒中每次消費之前,隨機sleep 2~3秒。

在執行之後,生產的object數量達到100(實際不會print 100號object的生成,因為在生產的過程中已經開始消費),然後多個執行緒開始消費。然而在把object數量消費至0以後,執行緒們並不會結束,既“print “”All done“””語句一直沒有被執行。

思考以後,得出了三種解決途徑:

1.線上程的Join方法中加入引數timeout,如果執行緒阻塞,執行緒執行時間達到timeout時,將中止該執行緒。

該方法的缺點在於當生產數量不確定時,timeout的時間無法很好的確定。如果join的時間太短,可能有的程序還在執行,主程序就繼續運行了。如果join的時間太長,線上程很多的情況下,將會阻塞很長的一段時間。

2.試圖考慮為什麼執行緒會阻塞。

發現在read函式中,如前所述,在每次消費之前,隨機sleep 2~3秒。於是可能會出現以下的問題。

當 queue.qsize() = 1的時候,某個執行緒x進入了while迴圈,然後開始睡眠2~3秒,在這個睡眠過程中,GUI切換至其它的執行緒,此時由於執行緒x處於睡眠,並沒有呼叫readq函式,因此queue中仍然有一個元素。以此類推,每個程序都在queue.qsize() = 1時進入了while迴圈,然後最早結束睡眠的執行緒將呼叫readq函式中的queue.get(1)。之後其它程序在呼叫queue.get(1)時,將會因為queue中缺少元素阻塞。

解決方法如下: 

def read(queue):
    sleep(randint(2, 4))
    while queue.qsize():
        lock.acquire()
        readQ(queue)
        lock.release()
        sleep(randint(2, 4))

在while之前睡眠,且將每次消費時所需的睡眠放至readQ函式之後。可避免多個執行緒同時進入while迴圈,該改進將不會

引起阻塞。

3.Queue中的get方法,若將其引數設定為非0,則不會因為佇列中沒有可用元素而阻塞。將get的引數設定為0,

利用try/excep語句,當get不到資料引起異常時,excep一個break,中斷執行緒。