1. 程式人生 > >20190102(多線程,守護線程,線程互斥鎖,信號量,JoinableQueue)

20190102(多線程,守護線程,線程互斥鎖,信號量,JoinableQueue)

車間 set nbsp 線程互斥 queue lease start 互斥 property

多線程

多進程: 核心是多道技術,本質上就是切換加保存技術。 當進程IO操作較多,可以提高程序效率。 每個進程都默認有一條主線程。

多線程: 程序的執行線路,相當於一條流水線,其包含了程序的具體執行步驟。 操作系統是工廠,進程就是車間,線程就是流水線。 同一個進程的線程PID相同

線程和進程的關系: 進程包含了運行程序的所有資源,同一進程內的線程們共享該資源。不同進程內的線程資源是隔離的。 進程是一個資源單位,線程是CPU的最小執行單位! 每一個進程一旦被創建,就默認開啟了一條線程,該線程稱之為主線程。 一個進程可以包含多個線程。進程包含線程,線程依賴進程。

為什麽使用線程: 提高程序效率,進程對操作系統的資源占用較高。 線程是如何提高效率的: 默認情況下,每個進程有且只有一條主線程,執行代碼時如果遇到IO,系統就會切換到其他應用程序(其實是切換線程),這會降低當前應用程序的效率,CPU切換是在不同線程之間進行切換,多線程可以使CPU在一個進程內切換,從而提高程序對CPU的占用率。

什麽時候啟用多線程: 當程序遇到IO時 但如果程序是純計算時,使用多線程無法提高效率。

進程和線程的區別: 進程對於操作系統的資源占用非常高,而線程比較低(是進程的十分之一到百分之一) 同一個進程內的線程共享該進程的資源

如何使用多線程: 開啟線程的兩種方式: 1、實例化Thread類 2、繼承Thread類,覆蓋run方法

# 實例化Thread
from threading import Thread
?
def task():
print("threading run")
t1 = Thread(target=task) # 形式與進程類似
t1.start() # 手動啟動線程,不用像進程那樣放到main下
print("over") # 子線程可能更快,也可能慢
?
# 繼承Thread類,覆蓋run方法
class MyThread(Thread):
def run(self):
print("子線程 run")
?
MyThread().start()
print("over")

線程與進程的資源占用對比:

from multiprocessing import Process
from threading import Thread
import time
?
?
def task():
print("子進程run")
?
if __name__ == ‘__main__‘:
start = time.time()
ps = []
for i in range(100):
p = Process(target=task)
p.start()
ps.append(p)
for p in ps:
p.join() # 這一步是為了讓所有子進程都執行完畢後在執行主進程,進而得到準確的運行時間
print(time.time()-start) #我的電腦用了74s
?
start = time.time()
ts = []
for i in range(100):
t = Thread(target=task)
t.start()
ts.append(t)
for t in ts:
t.join() # 子線程都結束後再執行主線程
print(time.time() - start) # 0.05s 效率提升了上千倍

同一進程內的線程資源共享

from threading import Thread
import time
?
x = 100
def task():
global x
x = 0
t = Thread(target=task)
t.start()
time.sleep(0.1) # 子線程先運行
print(x) # 輸出為0
?
from threading import Thread
import time
?
x = 100
def task():
global x
time.sleep(0.1) # 主線程先運行
x = 0
t = Thread(target=task)
t.start()
print(x) # 輸出為100

守護線程

守護線程:在所有非守護線程結束後結束的線程。 一個程序中可以有多個守護線程,非守護線程都結束後,多個守護進程一起死。

# 調節下面的兩處時間,可以得到不同的結果
# 兩處時間都取消時,守護進程可能也會完全打印,主要是因為主線程在打印結束後需要時間去結束(並不是說只要主線程打印結束立馬主線程就完全結束,結束也是需要時間的),而線程速度極快,所以可能完全執行。當然,也可能部分執行,這取決於CPU的運行情況。
from threading import Thread
import time
?
def task():
print("子線程運行。。。")
# time.sleep(1)
print("子線程運行。。。")
t = Thread(target=task)
t.setDaemon(True) # 要放到子線程開始前,註意進程的守護進程是deamon,線程的守護線程是setDeamon
t.start()
# time.sleep(0.1)
print("over")

線程中常用屬性

from threading import Thread,current_thread,enumerate,active_count
?
import os
?
def task():
print("running...")
print("子線程",os.getpid()) # 主線程和子線程PID相同
print(current_thread()) # 獲取當前線程對象
print(active_count()) # 獲取正在運行的線程個數
?
t1 = Thread(target=task)
t1.start()
print("主線程",os.getpid())
?
print(t1.is_alive()) # t1線程是否還在運行
print(t1.isAlive()) # 同上,只是兩種不同寫法,官方文檔中顯示一模一樣:isAlive = is_alive
print(t1.getName()) # 獲取子線程t1的線程名稱(Thread-1)(即第一個子線程)
?
print(enumerate()) # 獲取所有線程對象列表
輸出結果:
[<_MainThread(MainThread, started 19516)>, <Thread(Thread-1, started 9072)>]

線程互斥鎖

當多個線程需要同時修改同一份數據時,可能會造成數據錯亂,此時需要互斥鎖。(類似於進程互斥鎖)

不用線程互斥鎖的情況:

a = 100
from threading import Thread
import time
def task():
global a
temp = a - 1
time.sleep(0.01) # 強行增加子線程運行時間,由於沒有互斥鎖,主線程的a會錯亂
a = temp # 故意增加步驟,延長線程運行時間
for i in range(100):
t = Thread(target=task)
t.start()
?
print(a) # 輸出結果可能為98或99等等(都有可能)

解決辦法,加入線程互斥鎖,並且添加join確保子線程全部運行結束:

a = 100
from threading import Thread,Lock
import time
lock = Lock()
def task():
lock.acquire() # 與進程互斥鎖一致
global a
temp = a - 1
time.sleep(0.01)
a = temp
lock.release() # 拿到一次鎖就必須釋放一次
ts = []
for i in range(100):
t = Thread(target=task)
t.start()
ts.append(t)
for t in ts:
t.join() # 確保所有子進程都結束
?
# t.join() # 使用此方法不行,因為這樣只能等待最後一個子線程,但是無法保證最後一個子線程是最後執行的,如果最後一個子線程卻先於一些線程執行完畢,那麽還是會出問題。
print(a) # 結果為0

信號量

信號量也是一種鎖,特點是可以設置一個數據可以被幾個線程(進程)共享。

與Lock的區別:
Lock鎖,數據同時只能被一個線程使用
信號量,可以指定數據同時被多個線程使用
?
使用場景:
限制一個數據被同時訪問的次數,保證程序/操作系統的穩定
比如一臺電腦設置共享,限制連接的終端,保證CPU/磁盤不至於過載導致錯亂或崩潰

例子:

from threading import Semaphore,Thread,current_thread
import time
?
sem = Semaphore(3) # 信號量設定為3,一份數據最多允許三份子線程訪問
?
def task():
sem.acquire() # 和鎖的用法類似
print("%s run" %current_thread())
time.sleep(1)
sem.release()
?
for i in range(10):
t = Thread(target=task)
t.start()

輸出結果:(三個三個一輸出)
<Thread(Thread-1, started 13944)> run
<Thread(Thread-2, started 27096)> run
<Thread(Thread-3, started 26108)> run
<Thread(Thread-4, started 20580)> run
<Thread(Thread-5, started 20304)> run
<Thread(Thread-6, started 10676)> run
<Thread(Thread-8, started 18256)> run
<Thread(Thread-7, started 22684)> run
<Thread(Thread-9, started 18532)> run
<Thread(Thread-10, started 7036)> run

生產者消費者+守護進程

應用:生產者和消費者通過隊列交換數據,都結束後程序再結束。

這裏先補充一個其他知識:

進程隊列的另一個類JoinableQueue

JoinableQueue同樣通過multiprocessing使用。

創建隊列的另外一個類:

JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

參數介紹:

maxsize是隊列中允許最大項數,省略則無大小限制。

方法介紹:

JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:

q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常

q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止

簡言之:(如果還不理解看最下面的官方文檔)

task_done()表示隊列中的某個任務已經完成了,會返回給join一個信號。 join()會一直阻塞直到隊列中的所有任務已經被獲取並處理。一旦任務被加入隊列,任務計數會加1,一旦收到task_done()的返回信號,任務計數會減1,如果任務計數減為0,則join()解除阻塞,程序也就能繼續運行了。

import time,random
from multiprocessing import JoinableQueue,Process
?
?
def eat_hotdog(name,q):
while True: # 因為是守護進程,所以只要主進程結束,該循環也就結束了。
res = q.get()
print("%s 在吃 %s" %(name,res))
time.sleep(random.randint(1,2))
q.task_done() # 記錄當前已經被處理的數據的數量,該方法是JoinableQueue下的方法。
?
?
def make_hotdog(name,q):
for i in range(1,5):
res = "第%s熱狗" %i
print("%s 生產了 %s" % (name, res))
q.put(res)
?
?
if __name__ == ‘__main__‘:
q = JoinableQueue()
p1 = Process(target=make_hotdog,args=("徐福記熱狗店",q))
p1.start()
?
p3 = Process(target=make_hotdog,args=("萬達熱狗店",q))
p3.start()
?
p2 = Process(target=eat_hotdog,args=("思聰",q))
p2.daemon = True # 此處加入守護進程,確保主程序結束後,該進程的while循環也會結束
p2.start()
?
# 生產者全部生產完成
p1.join()
p3.join()
?
# 保證隊列為空,全部被處理
q.join() # 該join方法是JoinableQueue下的方法,並不是上面的那種join方法

輸出結果:
徐福記熱狗店 生產了 第1熱狗
徐福記熱狗店 生產了 第2熱狗
徐福記熱狗店 生產了 第3熱狗
徐福記熱狗店 生產了 第4熱狗
萬達熱狗店 生產了 第1熱狗
萬達熱狗店 生產了 第2熱狗
萬達熱狗店 生產了 第3熱狗
萬達熱狗店 生產了 第4熱狗
思聰 在吃 第1熱狗
思聰 在吃 第2熱狗
思聰 在吃 第3熱狗
思聰 在吃 第4熱狗
思聰 在吃 第1熱狗
思聰 在吃 第2熱狗
思聰 在吃 第3熱狗
思聰 在吃 第4熱狗

官方文檔:

JoinableQueue是由Queue派生出來的一個類,與Queue不同,JoinableQueue有task_done()和Join()方法。

task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done()tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.
?
join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

20190102(多線程,守護線程,線程互斥鎖,信號量,JoinableQueue)