1. 程式人生 > >python進階學習(一)--多線程編程

python進階學習(一)--多線程編程

不用 才會 睡眠 關鍵字參數 war 信息 target 函數傳遞 消息隊列

1. 多線程

  • 概念:簡單地說操作系統可以同時執行多個不用程序。例如:一邊用瀏覽器上網,一邊在聽音樂,一邊在用筆記軟件記筆記。
  • 並發:指的是任務數多余cpu核數,通過操作系統的各種任務調度算法,實現用多個任務“一起”執行(實際上總有一些任務不在執行,因為切換任務的熟度相當快,看上去一起執行而已)
  • 並行:指的是任務數小於等於CPU核數,即任務真的是一起執行的。

2. 線程

  • 概念:線程是進程的一個實體,是CPU調度和分派的基本單位。
  • threading--單線程執行:
 1 import time
 2 
 3 
 4 def saySorry():
 5     print("
親愛的,我錯了,我能吃飯了嗎?") 6 # 時間停頓1秒 7 time.sleep(1) 8 9 10 if __name__ == "__main__": 11 for i in range(5): 12 saySorry()

技術分享

  • threading--多線程執行:
import threading
import time


def saySorry():
    print("親愛的,我錯了,我能吃飯了嗎?")
    time.sleep(1)



if __name__ == "__main__":
    for i in
range(5): t = threading.Threading(target=saySorry) # 啟動線程,即讓線程開始執行 t.start()
  • 單線程與多線程比較
    • 單線程要比多線程花費時間多
    • 在創建完線程,需要調用start()方法來啟動
  • 查看線程數量
 1 import threading
 2 import time
 3 
 4 
 5 class MyThread(threading.Thread):
 6     def run(self):
 7         for i in range(3):
 8
time.sleep(1) 9 # name 屬性中保存的是當前線程的名字 10 msg = "I‘m" + self.name + @ + str(i) 11 print(msg) 12 13 14 if __name__ == "__main__": 15 t = MyThread() 16 t.start() 17 # 通過帶下標索引enumerate()方法 18 length = len(threading.enumerate()) 19 print("當前運行的線程數為:%d" % length)
  • 線程執行代碼的封裝:

思考:定義一個新的子類class,只有繼承threading.Thead就可以,然後重寫run方法。

 1 import threading
 2 import time
 3 
 4 
 5 class MyThread(threading.Thread):
 6 
 7 
 8     def run(self):
 9         for i in range(3):
10             time.sleep(1)
11             msg = "I‘m" + self.name + @ + str(i)  # name 屬性中保存的是當前線程的名字
12             print(msg)
13 
14 
15 if __name__ == "__main__":
16     t = MyThread()
17     t.start()

技術分享

說明:threading.Thread類有一個run方法,用戶定義線程的功能函數,可以在自己的線程類中覆蓋該方法。而創建自己的線程實例後,通過Thread類的start方法,可以啟動該線程,當該線程獲得執行的機會時,就會調用run方法執行線程。

  • 線程的狀態
    • 多線程的執行順序是不確定的。當執行到sleep語句時,線程將被阻塞,到sleep結束後,線程進入就緒狀態,等待調度。而線程調度將自行選擇一個線程執行。
    • 狀態:
      (1) New 創建線程
      (2) Runnable 就緒,等待調度
      (3) Running 運行。
      (4) Blocked 阻塞。阻塞可能在Wait Locked Sleeping
      (5) Dead 消亡
  • 線程中執行到阻塞,可能有三種情況:
    • 同步:線程中獲取同步鎖,但是資源已經被其他線程鎖定時,進入Locked狀態,直到該資源可獲取(獲取的順序由Lock隊列控制)
    • 睡眠:線程運行sleep()或join()方法後,線程進入Sleeping狀態。區別在於sleep等待固定的時間,而join是等待子線程執行完。當然join也可以指定一個“超時時間”。從語義上來說,如果兩個線程a,b, 在a中調用b.join(),相當於合並(join)成一個線程。最常見的情況是在主線程中join所有的子線程。
    • 等待:線程中執行wait()方法後,線程進入Waiting狀態,等待其他線程的通知(notify)。
  • 線程類型
    線程有著不同的狀態,也有不同的類型:
    • 主線程
    • 子線程
    • 守護線程(後臺線程)
    • 前臺線程
  • 多線程--共享全局變量問題
 1 from threading import Thread
 2 import time
 3 
 4 g_num = 100
 5 
 6 
 7 def work1():
 8     global g_num
 9     for i in range(3):
10         g_num += 1
11     print("----in work1, g_num is %d---" % g_num)
12 
13 
14 def work2():
15     global g_num
16     print("----in work2, g_num is %d---" % g_num)
17 
18 
19 print("---線程創建之前g_num is %d---" % g_num)
20 t1 = Thread(target=work1)
21 t1.start()
22 # 延時一會,保證t1線程中的事情做完
23 time.sleep(1)
24 t2 = Thread(target=work2)
25 t2.start()

運行結果:

---線程創建之前g_num is 100---
----in work1, g_num is 103---
----in work2, g_num is 103---

  • 共享全局變量問題說明:
    • 在一個進程內的所有線程共享全局變量,很方便在多個線程間共享數據。
    • 缺點就是,線程是對全局變量隨意更改可能造成多線程之間對全局變量的混亂(即線程非安全)
    • 如果多個線程它同時對同一個全局變量操作,會出現資源競爭問題,從而數據結果會不正確。
  • 解決方案:
    可以通過線程同步來進行解決線程同時修改全局變量的方式,在線程對全局變量進行修改時,都要先上鎖,處理完後再解鎖,在上鎖的整個過程中不允許其他線程訪問,就保證了數據的正確性。

3. 同步與互斥鎖

3.1 同步
  • 如果多個線程共同對某個數據修改,則可能出現不可預料的結果,為了保證數據的正確性,需要對多個線程進行同步。
  • 使用Tread對象的Lock和Rlock可以實現簡單的線程同步,這兩個對象都有acquire方法和release方法。對於那些需要每次只允許一個線程操作的數據,可以將其操作放到acquire和release方法之間。
3.2 互斥鎖
  • 互斥鎖為資源引入一個狀態:鎖定/非鎖定
  • 互斥鎖的作用:保證每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。
  • threading 模塊中定義了Loack類,可以方便處理鎖定:
 1 import threading
 2 import time
 3 class MyThread1(threading.Thread):
 4     def run(self):
 5         if mutexA.acquire():
 6             print(self.name+----do1---up----)
 7             time.sleep(1)
 8             if mutexB.acquire():
 9                 print(self.name+----do1---down----)
10                 mutexB.release()
11             mutexA.release()
12 class MyThread2(threading.Thread):
13     def run(self):
14         if mutexB.acquire():
15             print(self.name+----do2---up----)
16             time.sleep(1)
17             if mutexA.acquire():
18                 print(self.name+----do2---down----)
19                 mutexA.release()
20             mutexB.release()
21 mutexA = threading.Lock()
22 mutexB = threading.Lock()
23 if __name__ == __main__:
24     t1 = MyThread1()
25     t2 = MyThread2()
26     t1.start()
27     t2.start()
28 
29 ###########
30 ## 創建鎖
31 #mutex = threading.Lock()
32 ##鎖定
33 # acquire 獲得,取得,學到,捕獲。
34 #mutex.acquire([blocking])  
35 ## 釋放
36 #mutex.release()
  • 說明:鎖定方法acquirc 可以有一個blocking參數
    • 如果設定blocking為True,則當前線程會阻塞,直到獲取到這個鎖為止(如果沒有指定,那麽默認為True)
    • 如果設定blocking 為False,則當前線程不會阻塞。
  • 上鎖解鎖的過程
    • 當一個線程調用鎖的acquire()方法獲得鎖時,鎖就進入“locked” 狀態。
    • 每次只有一個線程可以獲得鎖。如果此時另一個線程試圖獲得這個鎖,該線程就會變為“blocked”狀態,稱為“阻塞”,直到擁有鎖的線程調用鎖的release()方法釋放鎖之後,鎖進入“unlocked”狀態。
    • 線程調度程序從處於同步阻塞狀態的線程中選擇一個來獲得鎖,並使得該線程進入運行(running)狀態。
  • 鎖的好處:
    • 確保了某段關鍵代碼只能由一個線程從頭到尾完整地執行
  • 鎖的壞處:
    • 阻止了多線程並發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率就大大地下降了
    • 由於可以存在多個鎖,不同的線程持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖
  • 死鎖
    定義:在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖。
    例子:
 1 import threading
 2 import time
 3 class MyThread1(threading.Thread):
 4     def run(self):
 5         if mutexA.acquire():
 6             print(self.name+----do1---up----)
 7             time.sleep(1)
 8             if mutexB.acquire():
 9                 print(self.name+----do1---down----)
10                 mutexB.release()
11             mutexA.release()
12 class MyThread2(threading.Thread):
13     def run(self):
14         if mutexB.acquire():
15             print(self.name+----do2---up----)
16             time.sleep(1)
17             if mutexA.acquire():
18                 print(self.name+----do2---down----)
19                 mutexA.release()
20             mutexB.release()
21 mutexA = threading.Lock()
22 mutexB = threading.Lock()
23 if __name__ == __main__:
24     t1 = MyThread1()
25     t2 = MyThread2()
26     t1.start()
27     t2.start()

技術分享

  • 避免死鎖
    • 程序設計時要盡量避免死鎖(銀行家算法)
    • 添加超時時間等。

4. 進程

定義:一個程序運行起來後,代碼和用到的資源稱之為進程。它是操作系統分配資源的基本單元。

4.1 進程的狀態

圖分析:

技術分享

技術分享

就緒態:運行的條件都已經慢去,正去等待cpu執行。
執行態:cpu正在執行其功能
等待態:等待某些條件滿足,例如一個程序sleep了,此時就處於等待態。

4.2 進程的創建

進程的創建實現例子:

 1 from multiprocessing import Process
 2 import time
 3 def run_proc():
 4     """子進程要執行的代碼"""
 5     while True:
 6         print("----2----")
 7         time.sleep(1)
 8 if __name__==__main__:
 9     p = Process(target=run_proc)  #創建一個進程
10     p.start()  #創建一個Process 實例,用start()方式啟動。
11     while True:
12         print("----1----")
13         time.sleep(1)
  • multiprocessing模塊說明:multiprocessing模塊是多跨平臺版本的多進程模塊,提供了一個Process類來代表一個進程對象,這個對象可以理解為是一個獨立的進程,可以執行另外的事情。
  • Process語法結構
    Process([group [, target [, name [, args [, kwargs]]]]])
    • target:如果傳遞了函數的引用,可以任務這個子進程就執行這裏的代碼
    • args:給target指定的函數傳遞的參數,以元組的方式傳遞
    • kwargs:給target指定的函數傳遞命名參數
    • name:給進程設定一個名字,可以不設定
    • group:指定進程組,大多數情況下用不到
    • Process創建的實例對象的常用方法:
    • start():啟動子進程實例(創建子進程)
    • is_alive():判斷進程子進程是否還在活著
    • join([timeout]):是否等待子進程執行結束,或等待多少秒
    • terminate():不管任務是否完成,立即終止子進程
    • Process創建的實例對象的常用屬性:
    • name:當前進程的別名,默認為Process-N,N為從1開始遞增的整數
    • pid:當前進程的pid(進程號)
4.3 線程與進程的區別
  • 定義的不同
    • 進程是系統進行資源分配和調度的一個獨立單位。
    • 線程是進程的一個實體,是CPU調度的基本單位。它是比進程更小的能獨立運行的基本單位.線程自己基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧),但是它可與同屬一個進程的其他的線程共享進程所擁有的全部資源.
  • 區別:
    • 一個程序至少有一個進程,一個進程至少有一個線程.
    • 線程的劃分尺度小於進程(資源比進程少),使得多線程程序的並發性高。
    • 進程在執行過程中擁有獨立的內存單元,而多個線程共享內 存,從而極大地提高了程序的運行效率
    • 線線程不能夠獨立執行,必須依存在進程中
  • 優缺點
    線程和進程在使用上各有優缺點:線程執行開銷小,但不利於資源的管理和保護;而進程正相反。

5. 進程間通信--Queue

可以使用multiprocessing模塊的Queue實現多進程之間的數據傳遞,Queue本身是一個消息列隊程序,首先用一個小實例來演示一下Queue的工作原理:

 1 from multiprocessing import Queue
 2 
 3 q = Queue(3)  # 初始化一個Queue對象,最多可接收三條put消息
 4 q.put("消息1")
 5 q.put("消息2")
 6 print(q.full())  # False
 7 q.put("消息3")
 8 print(q.full())  # True
 9 # 因為消息隊列已滿下面的try 都會拋出異常, 第一個try 會等待2秒後再拋出異常,第二個Try會立刻拋出異常
10 try:
11     q.put("消息4",True,2)
12 except:
13     print("消息隊列已滿,現有消息數量:%s" % q.qsize())
14 try:
15     q.put_nowait("消息4")
16 except:
17     print("消息隊列已滿,現有消息數量:%s" % q.qsize())
18     # 推薦的方式,先判斷消息隊列是否已滿,再寫入
19 if not q.full():
20     q.put_nowait("消息4")
21     # 讀取消息時,先判斷消息隊列是否為空,再讀取
22 if not q.empty():
23     for i in range(q.qsize()):
24         print(q.get_nowait())

    運行結果:

      False
      True
      消息列隊已滿,現有消息數量:3
      消息列隊已滿,現有消息數量:3
      消息1
      消息2
      消息3
  • 說明:
    初始化Queue()對象時(例如:q=Queue()),若括號中沒有指定最大可接收的消息數量,或數量為負值,那麽就代表可接受的消息數量沒有上限(直到內存的盡頭);
  • Queue.qsize():返回當前隊列包含的消息數量;
  • Queue.empty():如果隊列為空,返回True,反之False ;
  • Queue.full():如果隊列滿了,返回True,反之False;
  • Queue.get([block[, timeout]]):獲取隊列中的一條消息,然後將其從列隊中移除,block默認值為True;
    1)如果block使用默認值,且沒有設置timeout(單位秒),消息列隊如果為空,此時程序將被阻塞(停在讀取狀態),直到從消息列隊讀到消息為止,
    如果設置了timeout,則會等待timeout秒,若還沒讀取到任何消息,則拋出"Queue.Empty"異常;
    2)如果block值為False,消息列隊如果為空,則會立刻拋出"Queue.Empty"異常;
  • Queue.get_nowait():相當Queue.get(False);
  • Queue.put(item,[block[, timeout]]):將item消息寫入隊列,block默認值為True;
    1)如果block使用默認值,且沒有設置timeout(單位秒),消息列隊如果已經沒有空間可寫入,此時程序將被阻塞(停在寫入狀態),直到從消息列隊騰出空間為止,如果設置了timeout,則會等待timeout秒,若還沒空間,則拋出"Queue.Full"異常;
    2)如果block值為False,消息列隊如果沒有空間可寫入,則會立刻拋出"Queue.Full"異常;
  • Queue.put_nowait(item):相當Queue.put(item, False);
  • Queue.put(item,[block[, timeout]]):將item消息寫入隊列,block默認值為True;
    1)如果block使用默認值,且沒有設置timeout(單位秒),消息列隊如果已經沒有空間可寫入,此時程序將被阻塞(停在寫入狀態),直到從消息列隊騰出空間為止,如果設置了timeout,則會等待timeout秒,若還沒空間,則拋出"Queue.Full"異常;
    2)如果block值為False,消息列隊如果沒有空間可寫入,則會立刻拋出"Queue.Full"異常;
  • Queue.put_nowait(item):相當Queue.put(item, False);
Queue實例

在父進程中創建兩個子進程,一個往Queue裏寫數據,一個從Queue裏讀數據

 1 from multiprocessing import Process, Queue
 2 import os, time, random
 3 # 寫數據進程執行的代碼:
 4 def write(q):
 5     for value in [A, B, C]:
 6         print(Put %s to queue... % value)
 7         q.put(value)
 8         time.sleep(random.random())
 9 # 讀數據進程執行的代碼:
10 def read(q):
11     while True:
12         if not q.empty():
13             value = q.get(True)
14             print(Get %s from queue. % value)
15             time.sleep(random.random())
16         else:
17             break
18 if __name__==__main__:
19     # 父進程創建Queue,並傳給各個子進程:
20     q = Queue()
21     pw = Process(target=write, args=(q,))
22     pr = Process(target=read, args=(q,))
23     # 啟動子進程pw,寫入:
24     pw.start()    
25     # 等待pw結束:
26     pw.join()
27     # 啟動子進程pr,讀取:
28     pr.start()
29     pr.join()
30     # pr進程裏是死循環,無法等待其結束,只能強行終止:
31     print(‘‘)
32     print(所有數據都寫入並且讀完)

技術分享

6. 進程池Pool

針對大量的目標,手動創建進程的工作量巨大,此時就可以用到multiprocessing模塊提供的Pool方法。
Pool過程說明:
初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那麽就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那麽該請求就會等待,直到池中有進程結束,才會用之前的進程來執行新的任務,請看下面的實例:

 1 from multiprocessing import Pool
 2 import os, time, random
 3 def worker(msg):
 4     t_start = time.time()
 5     print("%s開始執行,進程號為%d" % (msg,os.getpid()))
 6     # random.random()隨機生成0~1之間的浮點數
 7     time.sleep(random.random()*2) 
 8     t_stop = time.time()
 9     print(msg,"執行完畢,耗時%0.2f" % (t_stop-t_start))
10 po=Pool(3) #定義一個進程池,最大進程數3
11 for i in range(0,10):
12     #Pool().apply_async(要調用的目標,(傳遞給目標的參數元祖,))
13     #每次循環將會用空閑出來的子進程去調用目標
14     po.apply_async(worker,(i,))
15 print("----start----")
16 po.close() #關閉進程池,關閉後po不再接收新的請求
17 po.join() #等待po中所有子進程執行完成,必須放在close語句之後
18 print("-----end-----")

運行結果:

----start----
0開始執行,進程號為21466
1開始執行,進程號為21468
2開始執行,進程號為21467
0 執行完畢,耗時1.01
3開始執行,進程號為21466
2 執行完畢,耗時1.24
4開始執行,進程號為21467
3 執行完畢,耗時0.56
5開始執行,進程號為21466
1 執行完畢,耗時1.68
6開始執行,進程號為21468
4 執行完畢,耗時0.67
7開始執行,進程號為21467
5 執行完畢,耗時0.83
8開始執行,進程號為21466
6 執行完畢,耗時0.75
9開始執行,進程號為21468
7 執行完畢,耗時1.03
8 執行完畢,耗時1.05
9 執行完畢,耗時1.69
-----end-----
  • multiprocessing.Pool常用函數解析:
    • apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args為傳遞給func的參數列表,kwds為傳遞給func的關鍵字參數列表;
    • close():關閉Pool,使其不再接受新的任務;
    • terminate():不管任務是否完成,立即終止;
    • join():主進程阻塞,等待子進程的退出, 必須在close或terminate之後使用;
進程池中的Queue

要使用Pool創建進程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocesing.Queue(),否則會得到一條如下的錯誤信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

進程池中的進程通信:

 1 # 修改import中的Queue為Manager
 2 from multiprocessing import Manager,Pool
 3 import os,time,random
 4 def reader(q):
 5     print("reader啟動(%s),父進程為(%s)" % (os.getpid(), os.getppid()))
 6     for i in range(q.qsize()):
 7         print("reader從Queue獲取到消息:%s" % q.get(True))
 8 def writer(q):
 9     print("writer啟動(%s),父進程為(%s)" % (os.getpid(), os.getppid()))
10     for i in "itcast":
11         q.put(i)
12 if __name__=="__main__":
13     print("(%s) start" % os.getpid())
14     q = Manager().Queue()  # 使用Manager中的Queue
15     po = Pool()
16     # 使用阻塞模式創建進程,這樣就不需要在reader中使用死循環了,可以讓writer完全執行完成後,再用reader去讀取
17     po.apply_async(writer, (q,))
18     time.sleep(1)  # 先讓上面的任務向Queue存入數據,然後再讓下面的任務開始從中取數據
19     po.apply_async(reader, (q,))
20     po.close()
21     po.join()
22     print("(%s) End" % os.getpid())

運行結果:

(11095) start
writer啟動(11097),父進程為(11095)
reader啟動(11098),父進程為(11095)
reader從Queue獲取到消息:i
reader從Queue獲取到消息:t
reader從Queue獲取到消息:c
reader從Queue獲取到消息:a
reader從Queue獲取到消息:s
reader從Queue獲取到消息:t
(11095) End

python進階學習(一)--多線程編程