python 多線程並發threading & 任務隊列Queue
https://docs.python.org/3.7/library/concurrency.html
python程序默認是單線程的,也就是說在前一句語句執行完之前後面的語句不能繼續執行
先感受一下線程,一般情況下:
def testa(): sleep(1) print "a" def testb(): sleep(1) print "b" testa() testb()
#先隔出一秒打印出a,再過一秒打出b
但是如果用了threading的話:
ta = threading.Thread(target=testa) tb = threading.Thread(target=testb)for t in [ta,tb]: t.start() for t in [ta,tb]: t.join() print "DONE" #輸出是ab或者ba(緊貼著的)然後空一行再來DONE的結果。
得到這樣的結果是因為這樣的,在start之後,ta首先開始跑,但是主線程(腳本本身)沒有等其完成就繼續開始下一輪循環,然後tb也開始了,在之後的一段時間裏,ta和tb兩條線程(分別代表了testa和testb這兩個過程)共同執行。相對於一個個叠代而言,這樣做無疑是大大提高了運行的速度。
Thread類為線程的抽象類,其構造方法的參數target指向一個函數對象,即該線程的具體操作。此外還可以有args=<tuple>來給target函數傳參數。需要註意的是當傳任何一個序列進去的話Thread會自動把它分解成單個單個的元素然後分解傳給target函數。我估計在定義的時候肯定是*args了。
join方法是個很tricky的東西,至今還不是很清楚地懂這是個什麽玩意兒。join([timeout])方法阻塞了主線程,直到調用此方法的子線程完成之後主線程才繼續往下運行。(之前我糊裏糊塗地把join就緊緊接在start後面寫了,如果這麽寫了的話那麽多線程在速度上就毫無優勢,和單線程一樣了= =)。而像上面這個示例一樣,先一個遍歷把所有線程 都啟動起來,再用一個遍歷把所有線程都join一遍似乎是比較通行的做法。
關於線程鎖
多線程程序涉及到一個問題,那就是當不同線程要對同一個資源進行修改或利用時會出現混亂,所以有必要引入線程鎖。
可以通過Thread.Lock類來創建簡單的線程鎖。lock = threading.Lock()即可。在某線程start之前,讓lock.acquire(),且lock在acquire()之後不能再acquire,否則會報錯。當線程結束後調用lock.release()來釋放鎖就好了。一般而言,有鎖的多線程場景可以提升一部分效率,但在寫文件等時機下會有阻塞等待的情況。相比之下,無所多線程場景可以進一步提升效率,但是可能會引起讀寫沖突等問題,所以要慎用。一定要確認各個線程間沒有共同的資源之類的問題後再實行無鎖多線程。
● 以上的包裝線程的方式是一種面向過程的方法,下面介紹一下如何面向對象地來抽象線程
面向對象地抽象線程需要自定義一個類繼承Thread類。比如自定義class MyThread(Thread)。這個類的一個實例就是代表了一個線程,然後通過重載這個類中的run方法(是run,不是start!!但start的動作確實就是調用run)來執行具體的操作。此時鎖可以作為一個構造方法的參數,將一個鎖傳進不同的實例中以實現線程鎖控制。比如:
#方法二:從Thread繼承,並重寫run() class MyThread(threading.Thread): def __init__(self,arg): super(MyThread, self).__init__()#註意:一定要顯式的調用父類的初始化函數。 self.arg=arg def run(self):#定義每個線程要運行的函數 time.sleep(1) print ‘the arg is:%s\r‘ % self.arg for i in xrange(4): t =MyThread(i) t.start() print ‘main thread end!‘
Thread類還有以下的一些方法,自定義的類也可以調用
getName()
setName(...) //其實Thread類在構造方法中有一個name參數,可以為相應的線程取一個名字。這兩個方法就是相關這個名字屬性的
isAlive() 一個線程從start()開始到run()結束的過程中沒有異常,則其實alive的。
setDaemon(True/False) 是否設置一個線程為守護線程。當你設置一個線程為守護線程之後,程序不會等待這個線程結束再退出程序,可參考http://blog.csdn.net/u012063703/article/details/51601579
● 除了Thread類,threading中還有以下一些屬性,簡單介紹一下:
Timer類,Timer(int,target=func) 和Thread類類似,只不過它在int秒過後才以target指定的函數開始線程運行
currentThread() 獲得當前線程對象
activeCount() 獲得當前活動的線程總個數
enumerate() 獲得所有活動線程的列表
settrace(func) 設置一跟蹤函數,在run執行前執行
setprofile(func) 設置一跟蹤函數,在run執行完畢之後執行
Queue用於建立和操作隊列,常和threading類一起用來建立一個簡單的線程隊列。
首先,隊列有很多種,根據進出順序來分類,可以分成
Queue.Queue(maxsize) FIFO(先進先出隊列)
Queue.LifoQueue(maxsize) LIFO(先進後出隊列)
Queue.PriorityQueue(maxsize) 為優先度越低的越先出來
如果設置的maxsize小於1,則表示隊列的長度無限長
FIFO是常用的隊列,其一些常用的方法有:
Queue.qsize() 返回隊列大小
Queue.empty() 判斷隊列是否為空
Queue.full() 判斷隊列是否滿了
Queue.get([block[,timeout]]) 從隊列頭刪除並返回一個item,block默認為True,表示當隊列為空卻去get的時候會阻塞線程,等待直到有有item出現為止來get出這個item。如果是False的話表明當隊列為空你卻去get的時候,會引發異常。在block為True的情況下可以再設置timeout參數。表示當隊列為空,get阻塞timeout指定的秒數之後還沒有get到的話就引發Full異常。
Queue.put(...[,block[,timeout]]) 向隊尾插入一個item,同樣若block=True的話隊列滿時就阻塞等待有空位出來再put,block=False時引發異常。同get的timeout,put的timeout是在block為True的時候進行超時設置的參數。
Queue.task_done() 從場景上來說,處理完一個get出來的item之後,調用task_done將向隊列發出一個信號,表示本任務已經完成
Queue.join() 監視所有item並阻塞主線程,直到所有item都調用了task_done之後主線程才繼續向下執行。這麽做的好處在於,假如一個線程開始處理最後一個任務,它從任務隊列中拿走最後一個任務,此時任務隊列就空了但最後那個線程還沒處理完。當調用了join之後,主線程就不會因為隊列空了而擅自結束,而是等待最後那個線程處理完成了。
結合threading和Queue可以構建出一個簡單的生產者-消費者模型,比如:
import threading import Queue import time class worker(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue=queue self.thread_stop=False def run(self): while not self.thread_stop: print("thread%d %s: waiting for tast" %(self.ident,self.name)) try: task=q.get(block=True, timeout=20)#接收消息 except Queue.Empty: print("Nothing to do!i will go home!") self.thread_stop=True break print("task recv:%s ,task No:%d" % (task[0],task[1])) print("i am working") time.sleep(3) print("work finished!") q.task_done()#完成一個任務 res=q.qsize()#判斷消息隊列大小 if res>0: print("fuck!There are still %d tasks to do" % (res)) def stop(self): self.thread_stop = True if __name__ == "__main__": q=Queue.Queue(3) worker=worker(q) worker.start() q.put(["produce one cup!",1], block=True, timeout=None)#產生任務消息 q.put(["produce one desk!",2], block=True, timeout=None) q.put(["produce one apple!",3], block=True, timeout=None) q.put(["produce one banana!",4], block=True, timeout=None) q.put(["produce one bag!",5], block=True, timeout=None) print("***************leader:wait for finish!") q.join()#等待所有任務完成 print("***************leader:all task finished!")
輸出是這樣的
thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one cup! ,task No:1 i am working work finished! fuck!There are still 3 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one desk! ,task No:2 i am workingleader:wait for finish! work finished! fuck!There are still 3 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one apple! ,task No:3 i am working work finished! fuck!There are still 2 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one banana! ,task No:4 i am working work finished! fuck!There are still 1 tasks to do thread139958685849344 Thread-1: waiting for tast 1 task recv:produce one bag! ,task No:5 i am working work finished! thread139958685849344 Thread-1: waiting for tast 1 ***************leader:all task finished! Nothing to do!i will go home!
上例中並沒有性能的提升(畢竟還是只有一個線程在跑)。線程隊列的意義並不是進一步提高運行效率,而是使線程的並發更加有組織。可以看到,在增加了線程隊列之後,程序對於線程的並發數量就有了控制。新線程想要加入隊列開始執行,必須等一個既存的線程完成之後才可以。舉個例子,比如
for i in range(x): t = MyThread(queue) t.start()
x在這裏是個變量,我們不知道這個循環會觸發多少線程並發,如果多的話就會很冒險。但是有了隊列之後,把一個隊列作為所有線程構建線程對象時的一個參數,讓線程必須按照這個隊列規定的大小來執行的話,就不擔心過多線程帶來的危險了。
python 多線程並發threading & 任務隊列Queue