1. 程式人生 > >多線程(threading module)

多線程(threading module)

創建 deadlock range dem one randint 加鎖 called 退出

一、線程與進程

線程定義:線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務。

進程定義:An executing instance of a program is called a process.(程序的執行實例稱為進程。)

線程與進程的區別:

  1. 線程共享創建它的進程的地址空間; 進程有自己的地址空間。
  2. 線程可以直接訪問其進程的數據段; 進程擁有自己父進程數據段的副本。
  3. 線程可以直接與其進程的其他線程通信; 進程必須使用進程間通信來與兄弟進程通信。
  4. 新線程很容易創建; 新流程需要復制父流程。
  5. 線程可以對同一進程的線程進行相當大的控制; 進程只能控制子進程。
  6. 對主線程的更改(取消,優先級更改等)可能會影響進程的其他線程的行為; 對父進程的更改不會影響子進程。

二、Python GIL(Global Interpreter Lock)

  --> 全局解釋器鎖 :在同一時刻,只能有一個線程進入解釋器。

三、threading 模塊

3.1 線程的2種調用方式

直接調用

技術分享圖片
 1 import threading
 2 import time
 3  
 4 def sayhi(num): #定義每個線程要運行的函數
 5  
 6     print("running on number:%s" %num)
 7  
 8     time.sleep(3)
 9  
10 if __name__ == __main__:
11  
12     t1 = threading.Thread(target=sayhi,args=(1,)) #
生成一個線程實例 13 t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一個線程實例 14 15 t1.start() #啟動線程 16 t2.start() #啟動另一個線程 17 18 print(t1.getName()) #獲取線程名 19 print(t2.getName())
View Code

繼承式調用

技術分享圖片
 1 import threading
 2 import time
 3  
 4  
 5 class MyThread(threading.Thread):
 6     def
__init__(self,num): 7 threading.Thread.__init__(self) 8 self.num = num 9 10 def run(self):#定義每個線程要運行的函數 11 12 print("running on number:%s" %self.num) 13 14 time.sleep(3) 15 16 if __name__ == __main__: 17 18 t1 = MyThread(1) 19 t2 = MyThread(2) 20 t1.start() 21 t2.start()
View Code

3.2 常用方法(Join/Daemon)

  join() --> 在子線程完成運行之前,這個子線程的父線程將一直被阻塞。

  setDaemon(True) --> 將線程聲明為守護線程,必須在start() 方法調用之前設置,守護線程隨主線程結束而結束。

其他方法:

技術分享圖片
 1 threading 模塊提供的其他方法:
 2 # threading.currentThread(): 返回當前的線程變量。
 3 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。
 4 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
 5 # 除了使用方法外,線程模塊同樣提供了Thread類來處理線程,Thread類提供了以下方法:
 6 # run(): 用以表示線程活動的方法。
 7 # start():啟動線程活動。
 8 # join([time]): 等待至線程中止。這阻塞調用線程直至線程的join() 方法被調用中止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
 9 # isAlive(): 返回線程是否活動的。
10 # getName(): 返回線程名。
11 # setName(): 設置線程名。
Method

3.3 同步鎖(Lock)

  r = threading.Lock()  r.acquire() --> 加鎖  r.release() --> 解鎖

技術分享圖片
 1 import time
 2 import threading
 3 
 4 def addNum():
 5     global num #在每個線程中都獲取這個全局變量
 6     # num-=1
 7     lock.acquire()
 8     temp=num
 9     print(--get num:,num )
10     #time.sleep(0.1)
11     num =temp-1 #對此公共變量進行-1操作
12     lock.release()
13 
14 num = 100  #設定一個共享變量
15 thread_list = []
16 lock=threading.Lock()
17 
18 for i in range(100):
19     t = threading.Thread(target=addNum)
20     t.start()
21     thread_list.append(t)
22 
23 for t in thread_list: #等待所有線程執行完畢
24     t.join()
25 
26 print(final num:, num )
View Code

3.4 線程死鎖和遞歸鎖

  在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所有這兩個線程在無外力作用下將一直等待下去。下面是一個死鎖的例子:

技術分享圖片
 1 import threading,time
 2 
 3 class myThread(threading.Thread):
 4     def doA(self):
 5         lockA.acquire()
 6         print(self.name,"gotlockA",time.ctime())
 7         time.sleep(3)
 8         lockB.acquire()
 9         print(self.name,"gotlockB",time.ctime())
10         lockB.release()
11         lockA.release()
12 
13     def doB(self):
14         lockB.acquire()
15         print(self.name,"gotlockB",time.ctime())
16         time.sleep(2)
17         lockA.acquire()
18         print(self.name,"gotlockA",time.ctime())
19         lockA.release()
20         lockB.release()
21     def run(self):
22         self.doA()
23         self.doB()
24 if __name__=="__main__":
25 
26     lockA=threading.Lock()
27     lockB=threading.Lock()
28     threads=[]
29     for i in range(5):
30         threads.append(myThread())
31     for t in threads:
32         t.start()
33     for t in threads:
34         t.join()
deadLock

解決辦法:使用遞歸鎖

  即重新定義一把鎖:lock = threading.RLock() --> 遞歸鎖

  將所有的鎖替換為遞歸鎖即可。遞歸鎖可以重復加鎖。

  RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。

3.5 信號量(Semaphore)--> 相當於一把鎖

  信號量用來控制線程並發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。

  計數器不能小於0,當計數器為 0時,acquire()將阻塞線程至同步鎖定狀態,直到其他線程調用release()。(類似於停車位的概念)

  BoundedSemaphore與Semaphore的唯一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,如果超過了將拋出一個異常。

技術分享圖片
 1 import threading,time
 2 class myThread(threading.Thread):
 3     def run(self):
 4         if semaphore.acquire():
 5             print(self.name)
 6             time.sleep(5)
 7             semaphore.release()
 8 if __name__=="__main__":
 9     semaphore=threading.Semaphore(5)
10     thrs=[]
11     for i in range(100):
12         thrs.append(myThread())
13     for t in thrs:
14         t.start()
Semaphore

3.6 條件變量同步(Condition)--> 鎖

  有一類線程需要滿足條件之後才能夠繼續執行,Python提供了threading.Condition 對象用於條件變量線程的支持,它除了能提供RLock()或Lock()的方法外,還提供了 wait()、notify()、notifyAll()方法。

lock_con=threading.Condition([Lock/Rlock]): 鎖是可選選項,不傳入鎖,對象自動創建一個RLock()。

1 wait():條件不滿足時調用,線程會釋放鎖並進入等待阻塞;
2 notify():條件創造後調用,通知等待池激活一個線程;
3 notifyAll():條件創造後調用,通知等待池激活所有線程。
技術分享圖片
 1 import threading,time
 2 from random import randint
 3 class Producer(threading.Thread):
 4     def run(self):
 5         global L
 6         while True:
 7             val=randint(0,100)
 8             print(生產者,self.name,":Append"+str(val),L)
 9             if lock_con.acquire():
10                 L.append(val)
11                 lock_con.notify()
12                 lock_con.release()
13             time.sleep(3)
14 class Consumer(threading.Thread):
15     def run(self):
16         global L
17         while True:
18                 lock_con.acquire()
19                 if len(L)==0:
20                     lock_con.wait()
21                 print(消費者,self.name,":Delete"+str(L[0]),L)
22                 del L[0]
23                 lock_con.release()
24                 time.sleep(0.25)
25 
26 if __name__=="__main__":
27 
28     L=[]
29     lock_con=threading.Condition()
30     threads=[]
31     for i in range(5):
32         threads.append(Producer())
33     threads.append(Consumer())
34     for t in threads:
35         t.start()
36     for t in threads:
37         t.join()
Condition Demo

3.7 同步條件(Event)

條件同步和條件變量同步差不多意思,只是少了鎖功能,因為條件同步設計於不訪問共享資源的條件環境。event=threading.Event():條件環境對象,初始值 為False;

1 event.isSet():返回event的狀態值;
2 
3 event.wait():如果 event.isSet()==False將阻塞線程;
4 
5 event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;
6 
7 event.clear():恢復event的狀態值為False。

示例:

技術分享圖片
 1 import threading,time
 2 class Boss(threading.Thread):
 3     def run(self):
 4         print("BOSS:今晚大家都要加班到22:00。")
 5         event.isSet() or event.set()
 6         time.sleep(5)
 7         print("BOSS:<22:00>可以下班了。")
 8         event.isSet() or event.set()
 9 class Worker(threading.Thread):
10     def run(self):
11         event.wait()
12         print("Worker:哎……命苦啊!")
13         time.sleep(0.25)
14         event.clear()
15         event.wait()
16         print("Worker:OhYeah!")
17 if __name__=="__main__":
18     event=threading.Event()
19     threads=[]
20     for i in range(5):
21         threads.append(Worker())
22     threads.append(Boss())
23     for t in threads:
24         t.start()
25     for t in threads:
26         t.join()
View Code

3.8 隊列(queue)-->多線程利器

queue中的方法:

創建一個“隊列”對象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類即是一個隊列的同步實現。隊列長度可為無限或者有限。可通過Queue的構造函數的可選參數maxsize來設定隊列長度。如果maxsize小於1就表示隊列長度無限。

將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,默認為
1。如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。

將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數為block,默認為True。如果隊列為空且block為True,get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。

Python Queue模塊有三種隊列及構造函數:
1、Python Queue模塊的FIFO隊列先進先出。  class queue.Queue(maxsize)
2、LIFO類似於堆,即先進後出。             class queue.LifoQueue(maxsize)
3、還有一種是優先級隊列級別越低越先出來。   class queue.PriorityQueue(maxsize)

此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回隊列的大小
q.empty() 如果隊列為空,返回True,反之False
q.full() 如果隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 相當q.get(False)
非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味著等到隊列為空,再執行別的操作

示例:

技術分享圖片
 1 import threading,queue
 2 from time import sleep
 3 from random import randint
 4 class Production(threading.Thread):
 5     def run(self):
 6         while True:
 7             r=randint(0,100)
 8             q.put(r)
 9             print("生產出來%s號包子"%r)
10             sleep(1)
11 class Proces(threading.Thread):
12     def run(self):
13         while True:
14             re=q.get()
15             print("吃掉%s號包子"%re)
16 if __name__=="__main__":
17     q=queue.Queue(10)
18     threads=[Production(),Production(),Production(),Proces()]
19     for t in threads:
20         t.start()
Demo

多線程(threading module)