Python筆記(十一):多線程
(二)和(三)不感興趣的可以跳過,這裏參考了《深入理解計算機系統》第一章和《Python核心編程》第四章
(一) 多線程編程
一個程序包含多個子任務,並且子任務之間相互獨立,讓這些子任務同時運行就是多線程編程。
(二) 進程
進程是操作系統對一個正在運行的程序的一種抽象(或者說進程指的就是運行中的程序)。無論是在單核還是多核系統中,一個CPU看上去都是在並發執行多個進程,實際上這是通過處理器在進程間的切換來實現的,這種切換稱為上下文切換。(下面只討論一個CPU單處理器的情況)
要運行一個新進程時:操作系統總是 1、保存當前進程的上下文。2、恢復新進程的上下文3、將控制權傳遞給新進程,然後新進程開始執行。
這裏說明下上下文的概念:
操作系統保持跟蹤進程運行所需的所有狀態信息。這種狀態,也就是上下文,包括許多信息,比如PC和寄存器文件的當前值,以及主存的內容。
舉個例子(解釋上下文的概念):你正在和張三談話,這時一個電話打過來,可以說你暫停了和張三談話的進程,然後切入新的進程(打電話),等電話打完後,你和張三從剛才停止的地方繼續交流。
這時想想能在剛剛的基礎上繼續交流的前提是什麽,我想應該是你還記得剛剛談話的內容,剛剛說到了什麽地方,然後才能在這個基礎上繼續交談下去。(計算機也是這樣,一個進程暫停的時候,會記住交談的內容、談到了什麽地方(計算機記住的這些東西就稱為上下文(就是當時進程運行時需要的所有狀態信息))。計算機恢復一個進程的時候,就是先恢復進程的上下文(就像你要繼續交流就要先想起剛剛的談話一樣),所以進程間的切換稱為上下文切換)
(三) 線程
在進程執行的過程中,可能有多個分支或多個步驟,例如執行程序A,可能有三個步驟A1、A2、A3,執行A1、A2、A3的過程就是線程。
例如:用戶向服務器發出請求-服務器接收請求-服務器處理請求-服務器返回資源。這時就可以有:
線程1:負責接收用戶的請求,放到一個隊列中。
線程2:處理請求,並提供輸出
線程3:負責返回資源
所以,一個進程實際上是由多個稱為線程的執行單元組成的,每個線程都運行在進程的上下文中。
關於進程和線程,可以將進程理解為1個完整的任務,線程就是一個個子任務。
子任務1+子任務2+子任務3…組成了一個進程。
(四)
有2個標準庫可以實現多線程,_thread和threading,threading更加先進,有更好的線程支持,推薦使用threading,下面也只對threading進行說明。
threading模塊對象
對象 |
說明 |
Thread |
表示一個執行線程的對象 |
Lock |
鎖原語對象 |
Semaphore |
為線程間共享的有限資源創建一個計數器,計數器值為0時,線程阻塞 |
BoundedSemaphore |
和Semaphore類似,不過它不允許計數器的值超過初始值 |
Thread類
方法 |
說明 |
__init__(self,target=None, args=(),kwargs={})
|
實例化一個線程對象,這裏只說明這3個參數的意思,target指函數名,args和kwargs都指要傳給函數的參數(args傳元組,kwargs傳字典),只要指定一個就行了 例如:有一個函數def loop1(name,t),實例化線程的時候,下面2種方式都是可以的 t = threading.Thread(target=loop1,kwargs={‘name‘:‘線程1‘,‘t‘:5}) t = threading.Thread(target=loop1,args=(‘線程1‘,5)) |
start |
開始執行線程 |
run |
定義線程功能的方法(一般在繼承threading.Thread的子類中重寫該方法) |
join(timeout=None) |
等待直到線程終止 |
接下來:
第五節: 舉個不使用多線程的例子。
第六、七節:說明使用threading.Tread創建多線程的2種方式
第八、九、十:分別說明為什麽要做線程同步、線程同步方式(鎖示例)、線程同步方式(信號量示例)
第十一:說明隊列queue模塊(該模塊提供線程間通信機制,從而讓線程間可以分享數據)
(五) 不使用多線程時的情況(接下來註意不使用多線程和使用多線程執行時間的區別)
1 import time2 def loop1(name,t): 3 print(name+‘開始時間‘ + time.ctime()) 4 time.sleep(t) 5 print(name+‘結束時間‘ + time.ctime()) 6 loop1(‘第一次‘,2) 7 loop1(‘第二次‘,5)
接下來對使用Tread創建多線程的2種方式進行說明:
1、創建Tread實例,傳函數。(六)
2、繼承threading.Thread創建子類,並創建子類的實例。(七)
(六) 創建Tread實例(傳函數),然後調用star啟動線程
target指函數名,args指要傳的參數
1 import threading 2 import time 3 def loop1(name,t): 4 print(name+‘開始時間‘ + time.ctime()) 5 time.sleep(t) 6 print(name+‘結束時間‘ + time.ctime()) 7 8 #創建新線程 9 t = threading.Thread(target=loop1,args=(‘線程1‘,2)) 10 t1 = threading.Thread(target=loop1,args=(‘線程2‘,5)) 11 #啟動線程 12 t.start() 13 t1.start()
(七) 繼承threading.Thread創建子類,實例化後調用star啟動線程
1 import threading 2 import time 3 class test1(threading.Thread): 4 def __init__(self,name,t): 5 threading.Thread.__init__(self) 6 self.name = name 7 self.t = t 8 def run(self): 9 loop1(self.name,self.t) 10 def loop1(name,t): 11 print(name+‘開始時間‘ + time.ctime()) 12 time.sleep(t) 13 print(name+‘結束時間‘ + time.ctime()) 14 #創建新線程 15 t = test1(‘線程1‘,2) 16 t1 = test1(‘線程2‘,5) 17 #啟動線程 18 t.start() 19 t1.start()
(八)線程同步(為什麽需要同步)
1 import threading 2 import time 3 class test1(threading.Thread): 4 def __init__(self,name,t): 5 threading.Thread.__init__(self) 6 self.name = name 7 self.t = t 8 def run(self): 9 print(‘線程1開始修改列表‘+time.ctime()) 10 #[i for i in range(100)]創建一個[0,1,2...99]的列表 11 loop1([i for i in range(100)]) 12 print(‘線程1結束修改列表‘+time.ctime()) 13 class test2(threading.Thread): 14 def __init__(self,name,t): 15 threading.Thread.__init__(self) 16 self.name = name 17 self.t = t 18 def run(self): 19 print(‘線程2打印列表:‘) 20 printList() 21 the_list = [] 22 def loop1(num): 23 for i in num: 24 the_list.append(i) 25 if i ==10: 26 time.sleep(1) 27 def printList(): 28 print(the_list) 29 #創建新線程 30 t = test1(‘線程1‘,2) 31 t1 = test2(‘線程2‘,5) 32 #啟動線程 33 t.start() 34 t1.start()
看上面這段代碼,線程1調用loop1()函數修改列表the_list,線程2調用pringtList()打印the_list.如果不同步,那麽可能出現loop1()函數修改到一半的時候,線程2就把the_list打印出來了,如下圖所示。
(九)線程同步(鎖示例)
接下來說明2種同步原語,鎖和信號量。
通過acquire()獲取鎖,release()釋放鎖。
1 import threading 2 import time 3 class test1(threading.Thread): 4 def __init__(self,name,t): 5 threading.Thread.__init__(self) 6 self.name = name 7 self.t = t 8 def run(self): 9 #獲取鎖,用於線程同步 10 threadLock.acquire() 11 print(‘開始修改列表‘+time.ctime()) 12 #[i for i in range(100)]創建一個[0,1,2...99]的列表 13 loop1([i for i in range(100)]) 14 print(‘結束修改列表‘+time.ctime()) 15 #釋放鎖,開始下一個線程 16 threadLock.release() 17 class test2(threading.Thread): 18 def __init__(self,name,t): 19 threading.Thread.__init__(self) 20 self.name = name 21 self.t = t 22 def run(self): 23 # 獲取鎖,用於線程同步 24 threadLock.acquire() 25 print(‘打印列表:‘) 26 printList() 27 # 釋放鎖,開始下一個線程 28 threadLock.release() 29 30 threadLock = threading.Lock() 31 the_list = [] 32 33 def loop1(num): 34 for i in num: 35 the_list.append(i) 36 if i ==10: 37 time.sleep(1) 38 def printList(): 39 print(the_list) 40 #創建新線程 41 t = test1(‘線程1‘,2) 42 t1 = test2(‘線程2‘,5) 43 #啟動線程 44 t.start() 45 t1.start()
用鎖同步就不會存在上面的情況了,輸出如下圖所示:
也可以使用with,將上面test1的代碼修改成下面的,執行with裏面的代碼時,會先調用acquire(),執行結束後,調用release()自動釋放鎖。
1 def run(self): 2 with threadLock: 3 print(‘開始修改列表‘ + time.ctime()) 4 # [i for i in range(100)]創建一個[0,1,2...99]的列表 5 loop1([i for i in range(100)]) 6 print(‘結束修改列表‘ + time.ctime())
(十)線程同步(信號量示例)
對操作系統PV操作有了解的,應該很容易理解。不知道也沒關系,舉個例子,有2臺打印機,有4個線程要調用這2臺打印機進行打印。
1、有2臺打印機,這時可用資源 =2,代碼中設置一個計數器(值為2)
2、線程1 、線程2分別調用不同的打印機進行打印(占用資源2,計數器值=2-2),此時線程3和4因為沒有資源,處於阻塞狀態。
3、線程1打印完後,釋放資源(計數器值=0+1)
4、此時線程3或線程4,調用打印機進行打印(計數器值=1-1)
5、。。。等所有線程打印完後,計數器值就恢復初始值(2),表示2臺打印機都處於空閑狀態。
簡單的說,這種方式就是
1、設置一個計數器,指定初始值
2、線程開始的時候調用acquire() (占用資源,計數器的值-1)
3、線程結束的時候調用release() (釋放資源,計數器值+1)
(計數器的值為0時,線程是處於阻塞狀態的)
可以在IDE上運行下面的代碼,看下輸出。
1 import threading 2 import time 3 #添加計數器(設置打印機數量為2) 4 #BoundedSemaphore還有一個作用:計數器的值永遠不會超過初始值 5 printerNum = threading.BoundedSemaphore(2) 6 class test1(threading.Thread): 7 def __init__(self, name, t): 8 threading.Thread.__init__(self) 9 self.name = name 10 self.t = t 11 def run(self): 12 # 計數器獲取鎖,計數器值-1 13 printerNum.acquire() 14 print(self.name+‘:開始執行‘ + time.ctime()) 15 loop1(self.t) 16 print(self.name+‘:結束執行‘ + time.ctime()) 17 # 計數器釋放鎖,計數器值+1 18 printerNum.release() 19 def loop1(t): 20 time.sleep(t) 21 threadName = [‘線程1‘,‘線程2‘,‘線程3‘,‘線程4‘] 22 threadList = [] 23 #創建新線程 24 for tname in threadName: 25 thread = test1(tname,2) 26 if tname == ‘線程2‘: 27 thread = test1(tname, 5) 28 #將線程添加到列表 29 threadList.append(thread) 30 #啟動線程 31 for t in threadList: 32 t.start() 33 #等待所有線程完成 34 for t in threadList: 35 t.join() 36 print(‘線程執行完成‘)
(十一)隊列queue
三種類型:FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先級隊列 PriorityQueue。
方法 |
說明 |
qsize() |
返回隊列大小 |
empty() |
如果隊列為空,返回True,反之False |
full() |
如果隊列滿了,返回True,反之False |
put(item, block=True,timeout=None) |
將item放進隊列中。block和timeout的作用和下面的get一樣,不過這邊的判斷條件是隊列有空間 |
put_nowait(item) |
和put(item,False)相同 |
get(block=True,timeout=None) |
block和timeout使用默認值:隊列中沒有元素時,阻塞到有可用元素為止 block:設置為Fasle,表示沒元素時不等待,報Empty異常。 timeout:設置一個超時時間,超時隊列還沒元素,報Empty異常 (block為Fasle時,timeout是不生效的) |
get_nowait() |
和get(False)相同 |
task_done() |
表示隊列中某個元素已經執行完畢,該方法會被下面的join()調用 |
join() |
所有元素執行完畢並調用上面的task_done()信號之前,保持阻塞 |
1 import threading 2 import time 3 import queue 4 from random import randint 5 #從隊列取數據 6 class test1(threading.Thread): 7 def __init__(self, name, workQueue): 8 threading.Thread.__init__(self) 9 self.name = name 10 self.workQueue = workQueue 11 def run(self): 12 loop1(self.name,self.workQueue) 13 #往隊列寫數據 14 class test2(threading.Thread): 15 def __init__(self, name, nameList): 16 threading.Thread.__init__(self) 17 self.name = name 18 self.nameList = nameList 19 def run(self): 20 loop2(self.name,self.nameList) 21 def loop1(name,workQueue): 22 #從隊列獲取數據 23 data = workQueue.get() 24 print(name + "從隊列獲取數據:" + data + time.ctime()) 25 time.sleep(randint(2, 5)) 26 def loop2(name,nameList): 27 #將數據存入隊列 28 for n in nameList: 29 workQueue.put(n) 30 print(name + "將數據存入隊列:" + n+time.ctime()) 31 time.sleep(randint(2, 3)) 32 threadName = [‘線程1‘,‘線程2‘,‘線程3‘,‘線程4‘] 33 nameList = [‘手機1‘,‘手機2‘,‘手機3‘,‘手機4‘,‘手機5‘] 34 #創建一個先入先出的隊列,最大值為10 35 workQueue = queue.Queue(10) 36 threadList = [] 37 #創建新線程 38 for tname in threadName: 39 thread = test1(tname,workQueue) 40 if tname == ‘線程2‘: 41 thread = test2(tname, nameList) 42 #將線程添加到列表 43 threadList.append(thread) 44 #啟動線程 45 for t in threadList: 46 t.start() 47 #等待所有線程完成 48 for t in threadList: 49 t.join() 50 print(‘線程執行完成‘)
Python筆記(十一):多線程