1. 程式人生 > >Python筆記(十一):多線程

Python筆記(十一):多線程

st2 pv操作 出現 end 談話 col 隊列大小 == done

(二)和(三)不感興趣的可以跳過,這裏參考了《深入理解計算機系統》第一章和《Python核心編程》第四章

(一) 多線程編程

一個程序包含多個子任務,並且子任務之間相互獨立,讓這些子任務同時運行就是多線程編程。

(二) 進程

進程是操作系統對一個正在運行的程序的一種抽象(或者說進程指的就是運行中的程序)。無論是在單核還是多核系統中,一個CPU看上去都是在並發執行多個進程,實際上這是通過處理器在進程間的切換來實現的,這種切換稱為上下文切換。(下面只討論一個CPU單處理器的情況)

要運行一個新進程時:操作系統總是 1、保存當前進程的上下文。2、恢復新進程的上下文3、將控制權傳遞給新進程,然後新進程開始執行。

這裏說明下上下文的概念:

操作系統保持跟蹤進程運行所需的所有狀態信息。這種狀態,也就是上下文,包括許多信息,比如PC和寄存器文件的當前值,以及主存的內容。

舉個例子(解釋上下文的概念):你正在和張三談話,這時一個電話打過來,可以說你暫停了和張三談話的進程,然後切入新的進程(打電話),等電話打完後,你和張三從剛才停止的地方繼續交流。

這時想想能在剛剛的基礎上繼續交流的前提是什麽,我想應該是你還記得剛剛談話的內容,剛剛說到了什麽地方,然後才能在這個基礎上繼續交談下去。(計算機也是這樣,一個進程暫停的時候,會記住交談的內容、談到了什麽地方(計算機記住的這些東西就稱為上下文(就是當時進程運行時需要的所有狀態信息))。計算機恢復一個進程的時候,就是先恢復進程的上下文(就像你要繼續交流就要先想起剛剛的談話一樣),所以進程間的切換稱為上下文切換)

(三) 線程

在進程執行的過程中,可能有多個分支或多個步驟,例如執行程序A,可能有三個步驟A1、A2、A3,執行A1、A2、A3的過程就是線程。

例如:用戶向服務器發出請求-服務器接收請求-服務器處理請求-服務器返回資源。這時就可以有:

線程1:負責接收用戶的請求,放到一個隊列中。

線程2:處理請求,並提供輸出

線程3:負責返回資源

所以,一個進程實際上是由多個稱為線程的執行單元組成的,每個線程都運行在進程的上下文中。

關於進程和線程,可以將進程理解為1個完整的任務,線程就是一個個子任務。

子任務1+子任務2+子任務3…組成了一個進程。

(四)

Python中多線程

有2個標準庫可以實現多線程,_thread和threading,threading更加先進,有更好的線程支持,推薦使用threading,下面也只對threading進行說明。

threading模塊對象

對象

說明

Thread

表示一個執行線程的對象

Lock

鎖原語對象

Semaphore

為線程間共享的有限資源創建一個計數器,計數器值為0時,線程阻塞

BoundedSemaphore

和Semaphore類似,不過它不允許計數器的值超過初始值

Thread

方法

說明

__init__(self,target=None,
args=(),kwargs={})

實例化一個線程對象,這裏只說明這3個參數的意思,target指函數名,args和kwargs都指要傳給函數的參數(args傳元組,kwargs傳字典),只要指定一個就行了

例如:有一個函數def loop1(name,t),實例化線程的時候,下面2種方式都是可以的
t = threading.Thread(target=loop1,kwargs={‘name‘:線程1‘,‘t‘:5})
t = threading.Thread(target=loop1,args=(線程1‘,5))
start

開始執行線程

run

定義線程功能的方法(一般在繼承threading.Thread的子類中重寫該方法)

join(timeout=None)

等待直到線程終止

接下來:

第五節: 舉個不使用多線程的例子。

第六、七節:說明使用threading.Tread創建多線程的2種方式

第八、九、十:分別說明為什麽要做線程同步、線程同步方式(鎖示例)、線程同步方式(信號量示例)

第十一:說明隊列queue模塊(該模塊提供線程間通信機制,從而讓線程間可以分享數據)

(五) 不使用多線程時的情況(接下來註意不使用多線程和使用多線程執行時間的區別)

1 import time2 def loop1(name,t):
3     print(name+開始時間 + time.ctime())
4     time.sleep(t)
5     print(name+結束時間 + time.ctime())
6 loop1(第一次,2)
7 loop1(第二次,5)

技術分享圖片

接下來對使用Tread創建多線程的2種方式進行說明:

1、創建Tread實例,傳函數。(六)

2、繼承threading.Thread創建子類,並創建子類的實例。(七)

(六) 創建Tread實例(傳函數),然後調用star啟動線程

target指函數名,args指要傳的參數

 1 import threading
 2 import time
 3 def loop1(name,t):
 4     print(name+開始時間 + time.ctime())
 5     time.sleep(t)
 6     print(name+結束時間 + time.ctime())
 7 
 8 #創建新線程
 9 t = threading.Thread(target=loop1,args=(線程1,2))
10 t1 = threading.Thread(target=loop1,args=(線程2,5))
11 #啟動線程
12 t.start()
13 t1.start()

技術分享圖片

(七) 繼承threading.Thread創建子類,實例化後調用star啟動線程

 1 import threading
 2 import time
 3 class test1(threading.Thread):
 4     def __init__(self,name,t):
 5         threading.Thread.__init__(self)
 6         self.name = name
 7         self.t = t
 8     def run(self):
 9         loop1(self.name,self.t)
10 def loop1(name,t):
11     print(name+開始時間 + time.ctime())
12     time.sleep(t)
13     print(name+結束時間 + time.ctime())
14 #創建新線程
15 t = test1(線程1,2)
16 t1 = test1(線程2,5)
17 #啟動線程
18 t.start()
19 t1.start()

技術分享圖片

(八)線程同步(為什麽需要同步)

 1 import threading
 2 import time
 3 class test1(threading.Thread):
 4     def __init__(self,name,t):
 5         threading.Thread.__init__(self)
 6         self.name = name
 7         self.t = t
 8     def run(self):
 9         print(線程1開始修改列表+time.ctime())
10         #[i for i in range(100)]創建一個[0,1,2...99]的列表
11         loop1([i for i in range(100)])
12         print(線程1結束修改列表+time.ctime())
13 class test2(threading.Thread):
14     def __init__(self,name,t):
15         threading.Thread.__init__(self)
16         self.name = name
17         self.t = t
18     def run(self):
19         print(線程2打印列表:)
20         printList()
21 the_list = []
22 def loop1(num):
23    for i in num:
24        the_list.append(i)
25        if i ==10:
26            time.sleep(1)
27 def printList():
28     print(the_list)
29 #創建新線程
30 t = test1(線程1,2)
31 t1 = test2(線程2,5)
32 #啟動線程
33 t.start()
34 t1.start()

看上面這段代碼,線程1調用loop1()函數修改列表the_list,線程2調用pringtList()打印the_list.如果不同步,那麽可能出現loop1()函數修改到一半的時候,線程2就把the_list打印出來了,如下圖所示。

技術分享圖片

(九)線程同步(鎖示例)

接下來說明2種同步原語,鎖和信號量。

通過acquire()獲取鎖,release()釋放鎖。

 1 import threading
 2 import time
 3 class test1(threading.Thread):
 4     def __init__(self,name,t):
 5         threading.Thread.__init__(self)
 6         self.name = name
 7         self.t = t
 8     def run(self):
 9         #獲取鎖,用於線程同步
10         threadLock.acquire()
11         print(開始修改列表+time.ctime())
12         #[i for i in range(100)]創建一個[0,1,2...99]的列表
13         loop1([i for i in range(100)])
14         print(結束修改列表+time.ctime())
15         #釋放鎖,開始下一個線程
16         threadLock.release()
17 class test2(threading.Thread):
18     def __init__(self,name,t):
19         threading.Thread.__init__(self)
20         self.name = name
21         self.t = t
22     def run(self):
23         # 獲取鎖,用於線程同步
24         threadLock.acquire()
25         print(打印列表:)
26         printList()
27         # 釋放鎖,開始下一個線程
28         threadLock.release()
29         
30 threadLock = threading.Lock()
31 the_list = []
32 
33 def loop1(num):
34    for i in num:
35        the_list.append(i)
36        if i ==10:
37            time.sleep(1)
38 def printList():
39     print(the_list)
40 #創建新線程
41 t = test1(線程1,2)
42 t1 = test2(線程2,5)
43 #啟動線程
44 t.start()
45 t1.start()

用鎖同步就不會存在上面的情況了,輸出如下圖所示:

技術分享圖片

也可以使用with,將上面test1的代碼修改成下面的,執行with裏面的代碼時,會先調用acquire(),執行結束後,調用release()自動釋放鎖。

1     def run(self):
2         with threadLock:
3             print(開始修改列表 + time.ctime())
4             # [i for i in range(100)]創建一個[0,1,2...99]的列表
5             loop1([i for i in range(100)])
6             print(結束修改列表 + time.ctime())

(十)線程同步(信號量示例

對操作系統PV操作有了解的,應該很容易理解。不知道也沒關系,舉個例子,有2臺打印機,有4個線程要調用這2臺打印機進行打印。

1、有2臺打印機,這時可用資源 =2,代碼中設置一個計數器(值為2)

2、線程1 、線程2分別調用不同的打印機進行打印(占用資源2,計數器值=2-2),此時線程3和4因為沒有資源,處於阻塞狀態。

3、線程1打印完後,釋放資源(計數器值=0+1)

4、此時線程3或線程4,調用打印機進行打印(計數器值=1-1)

5、。。。等所有線程打印完後,計數器值就恢復初始值(2),表示2臺打印機都處於空閑狀態。

簡單的說,這種方式就是

1、設置一個計數器,指定初始值

2、線程開始的時候調用acquire() (占用資源,計數器的值-1)

3、線程結束的時候調用release() (釋放資源,計數器值+1)

(計數器的值為0時,線程是處於阻塞狀態的)

可以在IDE上運行下面的代碼,看下輸出。

 1 import threading
 2 import time
 3 #添加計數器(設置打印機數量為2)
 4 #BoundedSemaphore還有一個作用:計數器的值永遠不會超過初始值
 5 printerNum = threading.BoundedSemaphore(2)
 6 class test1(threading.Thread):
 7     def __init__(self, name, t):
 8         threading.Thread.__init__(self)
 9         self.name = name
10         self.t = t
11     def run(self):
12         # 計數器獲取鎖,計數器值-1
13         printerNum.acquire()
14         print(self.name+:開始執行 + time.ctime())
15         loop1(self.t)
16         print(self.name+:結束執行 + time.ctime())
17         # 計數器釋放鎖,計數器值+1
18         printerNum.release()
19 def loop1(t):
20     time.sleep(t)
21 threadName = [線程1,線程2,線程3,線程4]
22 threadList = []
23 #創建新線程
24 for tname in threadName:
25     thread = test1(tname,2)
26     if tname == 線程2:
27         thread = test1(tname, 5)
28     #將線程添加到列表
29     threadList.append(thread)
30 #啟動線程
31 for t in threadList:
32     t.start()
33 #等待所有線程完成
34 for t in threadList:
35     t.join()
36 print(線程執行完成)

技術分享圖片

(十一)隊列queue

三種類型:FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先級隊列 PriorityQueue。

方法

說明

qsize()

返回隊列大小

empty()

如果隊列為空,返回True,反之False

full()

如果隊列滿了,返回True,反之False

put(item, block=True,timeout=None)

將item放進隊列中。block和timeout的作用和下面的get一樣,不過這邊的判斷條件是隊列有空間

put_nowait(item)

和put(item,False)相同

get(block=True,timeout=None)

block和timeout使用默認值:隊列中沒有元素時,阻塞到有可用元素為止

block:設置為Fasle,表示沒元素時不等待,報Empty異常。

timeout:設置一個超時時間,超時隊列還沒元素,報Empty異常

(block為Fasle時,timeout是不生效的)

get_nowait()

和get(False)相同

task_done()

表示隊列中某個元素已經執行完畢,該方法會被下面的join()調用

join()

所有元素執行完畢並調用上面的task_done()信號之前,保持阻塞

下面舉個例子,線程2(生產者)往隊列中插入數據,線程1、3、4(消費者)從隊列中取數據。(生產者-消費者問題)
 1 import threading
 2 import time
 3 import queue
 4 from random import randint
 5 #從隊列取數據
 6 class test1(threading.Thread):
 7     def __init__(self, name, workQueue):
 8         threading.Thread.__init__(self)
 9         self.name = name
10         self.workQueue = workQueue
11     def run(self):
12         loop1(self.name,self.workQueue)
13 #往隊列寫數據
14 class test2(threading.Thread):
15     def __init__(self, name, nameList):
16         threading.Thread.__init__(self)
17         self.name = name
18         self.nameList = nameList
19     def run(self):
20         loop2(self.name,self.nameList)
21 def loop1(name,workQueue):
22     #從隊列獲取數據
23     data = workQueue.get()
24     print(name + "從隊列獲取數據:" + data + time.ctime())
25     time.sleep(randint(2, 5))
26 def loop2(name,nameList):
27     #將數據存入隊列
28     for n in nameList:
29         workQueue.put(n)
30         print(name + "將數據存入隊列:" + n+time.ctime())
31         time.sleep(randint(2, 3))
32 threadName = [線程1,線程2,線程3,線程4]
33 nameList = [手機1,手機2,手機3,手機4,手機5]
34 #創建一個先入先出的隊列,最大值為10
35 workQueue = queue.Queue(10)
36 threadList = []
37 #創建新線程
38 for tname in threadName:
39     thread = test1(tname,workQueue)
40     if tname == 線程2:
41         thread = test2(tname, nameList)
42     #將線程添加到列表
43     threadList.append(thread)
44 #啟動線程
45 for t in threadList:
46     t.start()
47 #等待所有線程完成
48 for t in threadList:
49     t.join()
50 print(線程執行完成)
技術分享圖片


Python筆記(十一):多線程