python進階學習(一)--多線程編程
1. 多線程
- 概念:簡單地說操作系統可以同時執行多個不用程序。例如:一邊用瀏覽器上網,一邊在聽音樂,一邊在用筆記軟件記筆記。
- 並發:指的是任務數多余cpu核數,通過操作系統的各種任務調度算法,實現用多個任務“一起”執行(實際上總有一些任務不在執行,因為切換任務的熟度相當快,看上去一起執行而已)
- 並行:指的是任務數小於等於CPU核數,即任務真的是一起執行的。
2. 線程
- 概念:線程是進程的一個實體,是CPU調度和分派的基本單位。
- threading--單線程執行:
1 import time 2 3 4 def saySorry(): 5 print("親愛的,我錯了,我能吃飯了嗎?") 6 # 時間停頓1秒 7 time.sleep(1) 8 9 10 if __name__ == "__main__": 11 for i in range(5): 12 saySorry()
- threading--多線程執行:
import threading import time def saySorry(): print("親愛的,我錯了,我能吃飯了嗎?") time.sleep(1) if __name__ == "__main__": for i inrange(5): t = threading.Threading(target=saySorry) # 啟動線程,即讓線程開始執行 t.start()
- 單線程與多線程比較
- 單線程要比多線程花費時間多
- 在創建完線程,需要調用start()方法來啟動
- 查看線程數量
1 import threading 2 import time 3 4 5 class MyThread(threading.Thread): 6 def run(self): 7 for i in range(3): 8time.sleep(1) 9 # name 屬性中保存的是當前線程的名字 10 msg = "I‘m" + self.name + ‘@‘ + str(i) 11 print(msg) 12 13 14 if __name__ == "__main__": 15 t = MyThread() 16 t.start() 17 # 通過帶下標索引enumerate()方法 18 length = len(threading.enumerate()) 19 print("當前運行的線程數為:%d" % length)
- 線程執行代碼的封裝:
思考:定義一個新的子類class,只有繼承threading.Thead就可以,然後重寫run方法。
1 import threading 2 import time 3 4 5 class MyThread(threading.Thread): 6 7 8 def run(self): 9 for i in range(3): 10 time.sleep(1) 11 msg = "I‘m" + self.name + ‘@‘ + str(i) # name 屬性中保存的是當前線程的名字 12 print(msg) 13 14 15 if __name__ == "__main__": 16 t = MyThread() 17 t.start()
說明:threading.Thread類有一個run方法,用戶定義線程的功能函數,可以在自己的線程類中覆蓋該方法。而創建自己的線程實例後,通過Thread類的start方法,可以啟動該線程,當該線程獲得執行的機會時,就會調用run方法執行線程。
- 線程的狀態
- 多線程的執行順序是不確定的。當執行到sleep語句時,線程將被阻塞,到sleep結束後,線程進入就緒狀態,等待調度。而線程調度將自行選擇一個線程執行。
- 狀態:
(1) New 創建線程
(2) Runnable 就緒,等待調度
(3) Running 運行。
(4) Blocked 阻塞。阻塞可能在Wait Locked Sleeping
(5) Dead 消亡
- 線程中執行到阻塞,可能有三種情況:
- 同步:線程中獲取同步鎖,但是資源已經被其他線程鎖定時,進入Locked狀態,直到該資源可獲取(獲取的順序由Lock隊列控制)
- 睡眠:線程運行sleep()或join()方法後,線程進入Sleeping狀態。區別在於sleep等待固定的時間,而join是等待子線程執行完。當然join也可以指定一個“超時時間”。從語義上來說,如果兩個線程a,b, 在a中調用b.join(),相當於合並(join)成一個線程。最常見的情況是在主線程中join所有的子線程。
- 等待:線程中執行wait()方法後,線程進入Waiting狀態,等待其他線程的通知(notify)。
- 線程類型
線程有著不同的狀態,也有不同的類型:- 主線程
- 子線程
- 守護線程(後臺線程)
- 前臺線程
- 多線程--共享全局變量問題
1 from threading import Thread 2 import time 3 4 g_num = 100 5 6 7 def work1(): 8 global g_num 9 for i in range(3): 10 g_num += 1 11 print("----in work1, g_num is %d---" % g_num) 12 13 14 def work2(): 15 global g_num 16 print("----in work2, g_num is %d---" % g_num) 17 18 19 print("---線程創建之前g_num is %d---" % g_num) 20 t1 = Thread(target=work1) 21 t1.start() 22 # 延時一會,保證t1線程中的事情做完 23 time.sleep(1) 24 t2 = Thread(target=work2) 25 t2.start()
運行結果:
---線程創建之前g_num is 100---
----in work1, g_num is 103---
----in work2, g_num is 103---
- 共享全局變量問題說明:
- 在一個進程內的所有線程共享全局變量,很方便在多個線程間共享數據。
- 缺點就是,線程是對全局變量隨意更改可能造成多線程之間對全局變量的混亂(即線程非安全)
- 如果多個線程它同時對同一個全局變量操作,會出現資源競爭問題,從而數據結果會不正確。
- 解決方案:
可以通過線程同步來進行解決線程同時修改全局變量的方式,在線程對全局變量進行修改時,都要先上鎖,處理完後再解鎖,在上鎖的整個過程中不允許其他線程訪問,就保證了數據的正確性。
3. 同步與互斥鎖
3.1 同步
- 如果多個線程共同對某個數據修改,則可能出現不可預料的結果,為了保證數據的正確性,需要對多個線程進行同步。
- 使用Tread對象的Lock和Rlock可以實現簡單的線程同步,這兩個對象都有acquire方法和release方法。對於那些需要每次只允許一個線程操作的數據,可以將其操作放到acquire和release方法之間。
3.2 互斥鎖
- 互斥鎖為資源引入一個狀態:鎖定/非鎖定
- 互斥鎖的作用:保證每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。
- threading 模塊中定義了Loack類,可以方便處理鎖定:
1 import threading 2 import time 3 class MyThread1(threading.Thread): 4 def run(self): 5 if mutexA.acquire(): 6 print(self.name+‘----do1---up----‘) 7 time.sleep(1) 8 if mutexB.acquire(): 9 print(self.name+‘----do1---down----‘) 10 mutexB.release() 11 mutexA.release() 12 class MyThread2(threading.Thread): 13 def run(self): 14 if mutexB.acquire(): 15 print(self.name+‘----do2---up----‘) 16 time.sleep(1) 17 if mutexA.acquire(): 18 print(self.name+‘----do2---down----‘) 19 mutexA.release() 20 mutexB.release() 21 mutexA = threading.Lock() 22 mutexB = threading.Lock() 23 if __name__ == ‘__main__‘: 24 t1 = MyThread1() 25 t2 = MyThread2() 26 t1.start() 27 t2.start() 28 29 ########### 30 ## 創建鎖 31 #mutex = threading.Lock() 32 ##鎖定 33 # acquire 獲得,取得,學到,捕獲。 34 #mutex.acquire([blocking]) 35 ## 釋放 36 #mutex.release()
- 說明:鎖定方法acquirc 可以有一個blocking參數
- 如果設定blocking為True,則當前線程會阻塞,直到獲取到這個鎖為止(如果沒有指定,那麽默認為True)
- 如果設定blocking 為False,則當前線程不會阻塞。
- 上鎖解鎖的過程
- 當一個線程調用鎖的acquire()方法獲得鎖時,鎖就進入“locked” 狀態。
- 每次只有一個線程可以獲得鎖。如果此時另一個線程試圖獲得這個鎖,該線程就會變為“blocked”狀態,稱為“阻塞”,直到擁有鎖的線程調用鎖的release()方法釋放鎖之後,鎖進入“unlocked”狀態。
- 線程調度程序從處於同步阻塞狀態的線程中選擇一個來獲得鎖,並使得該線程進入運行(running)狀態。
- 鎖的好處:
- 確保了某段關鍵代碼只能由一個線程從頭到尾完整地執行
- 鎖的壞處:
- 阻止了多線程並發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率就大大地下降了
- 由於可以存在多個鎖,不同的線程持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖
- 死鎖
定義:在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖。
例子:
1 import threading 2 import time 3 class MyThread1(threading.Thread): 4 def run(self): 5 if mutexA.acquire(): 6 print(self.name+‘----do1---up----‘) 7 time.sleep(1) 8 if mutexB.acquire(): 9 print(self.name+‘----do1---down----‘) 10 mutexB.release() 11 mutexA.release() 12 class MyThread2(threading.Thread): 13 def run(self): 14 if mutexB.acquire(): 15 print(self.name+‘----do2---up----‘) 16 time.sleep(1) 17 if mutexA.acquire(): 18 print(self.name+‘----do2---down----‘) 19 mutexA.release() 20 mutexB.release() 21 mutexA = threading.Lock() 22 mutexB = threading.Lock() 23 if __name__ == ‘__main__‘: 24 t1 = MyThread1() 25 t2 = MyThread2() 26 t1.start() 27 t2.start()
- 避免死鎖
- 程序設計時要盡量避免死鎖(銀行家算法)
- 添加超時時間等。
4. 進程
定義:一個程序運行起來後,代碼和用到的資源稱之為進程。它是操作系統分配資源的基本單元。
4.1 進程的狀態
圖分析:
就緒態:運行的條件都已經慢去,正去等待cpu執行。
執行態:cpu正在執行其功能
等待態:等待某些條件滿足,例如一個程序sleep了,此時就處於等待態。
4.2 進程的創建
進程的創建實現例子:
1 from multiprocessing import Process 2 import time 3 def run_proc(): 4 """子進程要執行的代碼""" 5 while True: 6 print("----2----") 7 time.sleep(1) 8 if __name__==‘__main__‘: 9 p = Process(target=run_proc) #創建一個進程 10 p.start() #創建一個Process 實例,用start()方式啟動。 11 while True: 12 print("----1----") 13 time.sleep(1)
- multiprocessing模塊說明:multiprocessing模塊是多跨平臺版本的多進程模塊,提供了一個Process類來代表一個進程對象,這個對象可以理解為是一個獨立的進程,可以執行另外的事情。
- Process語法結構
Process([group [, target [, name [, args [, kwargs]]]]])- target:如果傳遞了函數的引用,可以任務這個子進程就執行這裏的代碼
- args:給target指定的函數傳遞的參數,以元組的方式傳遞
- kwargs:給target指定的函數傳遞命名參數
- name:給進程設定一個名字,可以不設定
- group:指定進程組,大多數情況下用不到
- Process創建的實例對象的常用方法:
- start():啟動子進程實例(創建子進程)
- is_alive():判斷進程子進程是否還在活著
- join([timeout]):是否等待子進程執行結束,或等待多少秒
- terminate():不管任務是否完成,立即終止子進程
- Process創建的實例對象的常用屬性:
- name:當前進程的別名,默認為Process-N,N為從1開始遞增的整數
- pid:當前進程的pid(進程號)
4.3 線程與進程的區別
- 定義的不同
- 進程是系統進行資源分配和調度的一個獨立單位。
- 線程是進程的一個實體,是CPU調度的基本單位。它是比進程更小的能獨立運行的基本單位.線程自己基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧),但是它可與同屬一個進程的其他的線程共享進程所擁有的全部資源.
- 區別:
- 一個程序至少有一個進程,一個進程至少有一個線程.
- 線程的劃分尺度小於進程(資源比進程少),使得多線程程序的並發性高。
- 進程在執行過程中擁有獨立的內存單元,而多個線程共享內 存,從而極大地提高了程序的運行效率
- 線線程不能夠獨立執行,必須依存在進程中
- 優缺點
線程和進程在使用上各有優缺點:線程執行開銷小,但不利於資源的管理和保護;而進程正相反。
5. 進程間通信--Queue
可以使用multiprocessing模塊的Queue實現多進程之間的數據傳遞,Queue本身是一個消息列隊程序,首先用一個小實例來演示一下Queue的工作原理:
1 from multiprocessing import Queue 2 3 q = Queue(3) # 初始化一個Queue對象,最多可接收三條put消息 4 q.put("消息1") 5 q.put("消息2") 6 print(q.full()) # False 7 q.put("消息3") 8 print(q.full()) # True 9 # 因為消息隊列已滿下面的try 都會拋出異常, 第一個try 會等待2秒後再拋出異常,第二個Try會立刻拋出異常 10 try: 11 q.put("消息4",True,2) 12 except: 13 print("消息隊列已滿,現有消息數量:%s" % q.qsize()) 14 try: 15 q.put_nowait("消息4") 16 except: 17 print("消息隊列已滿,現有消息數量:%s" % q.qsize()) 18 # 推薦的方式,先判斷消息隊列是否已滿,再寫入 19 if not q.full(): 20 q.put_nowait("消息4") 21 # 讀取消息時,先判斷消息隊列是否為空,再讀取 22 if not q.empty(): 23 for i in range(q.qsize()): 24 print(q.get_nowait())
運行結果:
False
True
消息列隊已滿,現有消息數量:3
消息列隊已滿,現有消息數量:3
消息1
消息2
消息3
- 說明:
初始化Queue()對象時(例如:q=Queue()),若括號中沒有指定最大可接收的消息數量,或數量為負值,那麽就代表可接受的消息數量沒有上限(直到內存的盡頭); - Queue.qsize():返回當前隊列包含的消息數量;
- Queue.empty():如果隊列為空,返回True,反之False ;
- Queue.full():如果隊列滿了,返回True,反之False;
- Queue.get([block[, timeout]]):獲取隊列中的一條消息,然後將其從列隊中移除,block默認值為True;
1)如果block使用默認值,且沒有設置timeout(單位秒),消息列隊如果為空,此時程序將被阻塞(停在讀取狀態),直到從消息列隊讀到消息為止,
如果設置了timeout,則會等待timeout秒,若還沒讀取到任何消息,則拋出"Queue.Empty"異常;
2)如果block值為False,消息列隊如果為空,則會立刻拋出"Queue.Empty"異常; - Queue.get_nowait():相當Queue.get(False);
- Queue.put(item,[block[, timeout]]):將item消息寫入隊列,block默認值為True;
1)如果block使用默認值,且沒有設置timeout(單位秒),消息列隊如果已經沒有空間可寫入,此時程序將被阻塞(停在寫入狀態),直到從消息列隊騰出空間為止,如果設置了timeout,則會等待timeout秒,若還沒空間,則拋出"Queue.Full"異常;
2)如果block值為False,消息列隊如果沒有空間可寫入,則會立刻拋出"Queue.Full"異常; - Queue.put_nowait(item):相當Queue.put(item, False);
- Queue.put(item,[block[, timeout]]):將item消息寫入隊列,block默認值為True;
1)如果block使用默認值,且沒有設置timeout(單位秒),消息列隊如果已經沒有空間可寫入,此時程序將被阻塞(停在寫入狀態),直到從消息列隊騰出空間為止,如果設置了timeout,則會等待timeout秒,若還沒空間,則拋出"Queue.Full"異常;
2)如果block值為False,消息列隊如果沒有空間可寫入,則會立刻拋出"Queue.Full"異常; - Queue.put_nowait(item):相當Queue.put(item, False);
Queue實例
在父進程中創建兩個子進程,一個往Queue裏寫數據,一個從Queue裏讀數據
1 from multiprocessing import Process, Queue 2 import os, time, random 3 # 寫數據進程執行的代碼: 4 def write(q): 5 for value in [‘A‘, ‘B‘, ‘C‘]: 6 print(‘Put %s to queue...‘ % value) 7 q.put(value) 8 time.sleep(random.random()) 9 # 讀數據進程執行的代碼: 10 def read(q): 11 while True: 12 if not q.empty(): 13 value = q.get(True) 14 print(‘Get %s from queue.‘ % value) 15 time.sleep(random.random()) 16 else: 17 break 18 if __name__==‘__main__‘: 19 # 父進程創建Queue,並傳給各個子進程: 20 q = Queue() 21 pw = Process(target=write, args=(q,)) 22 pr = Process(target=read, args=(q,)) 23 # 啟動子進程pw,寫入: 24 pw.start() 25 # 等待pw結束: 26 pw.join() 27 # 啟動子進程pr,讀取: 28 pr.start() 29 pr.join() 30 # pr進程裏是死循環,無法等待其結束,只能強行終止: 31 print(‘‘) 32 print(‘所有數據都寫入並且讀完‘)
6. 進程池Pool
針對大量的目標,手動創建進程的工作量巨大,此時就可以用到multiprocessing模塊提供的Pool方法。
Pool過程說明:
初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那麽就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那麽該請求就會等待,直到池中有進程結束,才會用之前的進程來執行新的任務,請看下面的實例:
1 from multiprocessing import Pool 2 import os, time, random 3 def worker(msg): 4 t_start = time.time() 5 print("%s開始執行,進程號為%d" % (msg,os.getpid())) 6 # random.random()隨機生成0~1之間的浮點數 7 time.sleep(random.random()*2) 8 t_stop = time.time() 9 print(msg,"執行完畢,耗時%0.2f" % (t_stop-t_start)) 10 po=Pool(3) #定義一個進程池,最大進程數3 11 for i in range(0,10): 12 #Pool().apply_async(要調用的目標,(傳遞給目標的參數元祖,)) 13 #每次循環將會用空閑出來的子進程去調用目標 14 po.apply_async(worker,(i,)) 15 print("----start----") 16 po.close() #關閉進程池,關閉後po不再接收新的請求 17 po.join() #等待po中所有子進程執行完成,必須放在close語句之後 18 print("-----end-----")
運行結果:
----start----
0開始執行,進程號為21466
1開始執行,進程號為21468
2開始執行,進程號為21467
0 執行完畢,耗時1.01
3開始執行,進程號為21466
2 執行完畢,耗時1.24
4開始執行,進程號為21467
3 執行完畢,耗時0.56
5開始執行,進程號為21466
1 執行完畢,耗時1.68
6開始執行,進程號為21468
4 執行完畢,耗時0.67
7開始執行,進程號為21467
5 執行完畢,耗時0.83
8開始執行,進程號為21466
6 執行完畢,耗時0.75
9開始執行,進程號為21468
7 執行完畢,耗時1.03
8 執行完畢,耗時1.05
9 執行完畢,耗時1.69
-----end-----
- multiprocessing.Pool常用函數解析:
- apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args為傳遞給func的參數列表,kwds為傳遞給func的關鍵字參數列表;
- close():關閉Pool,使其不再接受新的任務;
- terminate():不管任務是否完成,立即終止;
- join():主進程阻塞,等待子進程的退出, 必須在close或terminate之後使用;
進程池中的Queue
要使用Pool創建進程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocesing.Queue(),否則會得到一條如下的錯誤信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.
進程池中的進程通信:
1 # 修改import中的Queue為Manager 2 from multiprocessing import Manager,Pool 3 import os,time,random 4 def reader(q): 5 print("reader啟動(%s),父進程為(%s)" % (os.getpid(), os.getppid())) 6 for i in range(q.qsize()): 7 print("reader從Queue獲取到消息:%s" % q.get(True)) 8 def writer(q): 9 print("writer啟動(%s),父進程為(%s)" % (os.getpid(), os.getppid())) 10 for i in "itcast": 11 q.put(i) 12 if __name__=="__main__": 13 print("(%s) start" % os.getpid()) 14 q = Manager().Queue() # 使用Manager中的Queue 15 po = Pool() 16 # 使用阻塞模式創建進程,這樣就不需要在reader中使用死循環了,可以讓writer完全執行完成後,再用reader去讀取 17 po.apply_async(writer, (q,)) 18 time.sleep(1) # 先讓上面的任務向Queue存入數據,然後再讓下面的任務開始從中取數據 19 po.apply_async(reader, (q,)) 20 po.close() 21 po.join() 22 print("(%s) End" % os.getpid())
運行結果:
(11095) start
writer啟動(11097),父進程為(11095)
reader啟動(11098),父進程為(11095)
reader從Queue獲取到消息:i
reader從Queue獲取到消息:t
reader從Queue獲取到消息:c
reader從Queue獲取到消息:a
reader從Queue獲取到消息:s
reader從Queue獲取到消息:t
(11095) End
python進階學習(一)--多線程編程