python之路 -- 並發編程之線程
進程 是 最小的內存分配單位
線程 是 操作系統調度的最小單位
線程直接被CPU執行,進程內至少含有一個線程,也可以開啟多個線程
開啟一個線程所需要的時間要遠遠小於開啟一個進程
GIL鎖(即全局解釋器鎖) 鎖的是線程
在Cpython解釋器下的python程序 在同一時刻 多個線程中只能有一個線程被CPU執行
1.創建線程的兩中方式:
import time from threading import Thread def func(args): time.sleep(1) print(args) t = Thread(target=func,args=(10,)) #傳入的參數也必須以元組的形式傳 t.start() # 創建線程的另一種方式(使用面向對象創建線程) import time from threading import Thread class MyTread(Thread): def __init__(self,arg): super().__init__() # 調用父類的__init__ self.arg = arg def run(self): # 方法名必須是run time.sleep(1) print(self.arg) t= MyTread(10) t.start()
2.線程與進程效率的比較
import time from threading import Thread from multiprocessing import Process def func(n): n + 1 if __name__ == ‘__main__‘: start1 = time.time() t_lst1 = [] for i in range(100): t = Thread(target=func,args=(i,)) t.start() t_lst1.append(t)for t in t_lst1:t.join() t1 = time.time() - start1 start2 = time.time() t_lst2 = [] for i in range(100): t = Process(target=func, args=(i,)) t.start() t_lst2.append(t) for t in t_lst2: t.join() t2 = time.time() - start2 print(t1,t2) 輸出的結果為: 0.02698826789855957 10.160863876342773 # 可見開啟一個線程所需要的時間要遠遠小於開啟一個進程
多線程用於IO密集型,如socket,爬蟲,處理web請求,讀寫數據庫;
多進程用於計算密集型,如金融分析。
3.多進程與多線程數據共享的比較:
多個線程內部有自己的數據棧,線程內部數據不共享
全局變量在多個線程之間是共享的(線程之間資源共享)
多進程修改全局變量 import os,time from multiprocessing import Process from threading import Thread def func(): global g g -= 10 print(g,os.getpid()) g = 100 # 全局變量g t_lst = [] if __name__ == "__main__": for i in range(5): t = Process(target=func) t.start() t_lst.append(t) for t in t_lst: t.join() print(g) """輸出結果為: 90 143648 90 160300 90 143316 90 160804 90 159348 100 """ 多線程修改全局變量 import os,time from multiprocessing import Process from threading import Thread for i in range(5): t = Thread(target=func) t.start() t_lst.append(t) for t in t_lst : t.join() print(g) """輸出的結果為; 90 161716 80 161716 70 161716 60 161716 50 161716 50 """
4.線程模塊中的其他方法:
1、Thread實例對象的方法(t為實例化的一個線程對象)
t.isAlive():返回線程是否存活。
t.getName():返回線程名。
t.setName():設置線程名。
2、threading模塊提供的一些方法:
threading.active_count():查看程序中存在多少個活著的線程,與len(threading.enumerate())有相同的結果。
threading.enumerate(): 查看程序中的所有線程的線程名和線程號(列表的形式列出每個線程)。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。
threading.current_thread():查看當前線程的線程名和線程號
3.實例:
例子: import time import threading def func(n): time.sleep(0.5) print(n,t.getName(),threading.get_ident(),t.isAlive()) # 打印線程名,線程號,判斷線程是否在活動 # 當一個線程執行完成func之後就結束了,t.isAlive()查看為False for i in range(5): t = threading.Thread(target=func,args=(i,)) t.start() t.join() print(threading.active_count()) # 查看程序中存在多少個活著的線程 print(threading.current_thread()) # 查看當前線程的線程名和線程號 print(threading.enumerate()) # 查看程序中的所有線程的線程名和線程號(列表的形式列出每個線程) """執行結果為: 0 Thread-1 33164 True 1 Thread-2 57836 True 2 Thread-3 165280 True 3 Thread-4 171196 True 4 Thread-5 119008 True 1 <_MainThread(MainThread, started 165276)> [<_MainThread(MainThread, started 165276)>] "View Code
5.守護線程
守護進程與守護線程的區別:
1.守護進程隨著主進程代碼的執行結束而結束
2.守護線程會在主線程結束之後再等待其他子線程結束之後才結束
from threading import Thread import time def func(): print(‘我是守護線程‘) time.sleep(0.5) print(‘守護線程結束了‘) def func2(): print(‘我是線程2,非守護線程‘) time.sleep(2) print(‘func2執行完了‘) t = Thread(target=func) t.daemon = True # 仍是加在start()前 t.start() t2 = Thread(target=func2) t2.start() print(‘我是主線程‘) t2.join() print(‘主線程結束了。‘) """執行結果為: 我是守護線程 我是線程2,非守護線程 我是主線程 守護線程結束了 func2執行完了 主線程結束了。 """ # 主進程在執行完自己的代碼之後不會立即結束 而是等待子進程結束之後 再結束。然後回收子進程的資源實例
6.線程鎖
1.不加鎖
from threading import Thread import time def func(): global n time.sleep(0.5) n-=1 n = 10 t_list = [] for i in range(10): t = Thread(target=func) t.start() t_list.append(t) [t.join() for i in t_list] print(n) # 輸出結果n為9 # 創建線程非常快,創建的10個線程都拿到了global的n,此時n為10,然後各線程都做-1操作,得到的結都為9不加鎖
2.加鎖
from threading import Thread from threading import Lock def func(lock): global n lock.acquire() n-=1 lock.release() lock = Lock() n = 10 t_list = [] for i in range(10): t = Thread(target=func,args=(lock,)) t.start() t_list.append(t) [t.join() for i in t_list] print(n) # 輸出的結果為0 通過加鎖後保證了多線程數據安全,但損失了效率加鎖
互斥鎖Lock:在同一個線程或者進程之間,當有兩個acquire的時候,就會產生阻塞(死鎖)
遞歸鎖RLock:在同一個線程或則進程之間,無論acquire多少次都不會產生阻塞(死鎖)
3.多人吃面事例
多人吃面事例: # 有多個人在一個桌子上吃面,只有一份面和一個叉子,當一個人同時拿到了這兩樣才能完成一次吃面的動作。 from threading import Thread,Lock import time def eat1(name): noodle_lock.acquire() print(‘%s拿到面條了‘%name) fork_lock.acquire() print(‘%s拿到叉子了‘%name) print(‘%s吃面‘%name) noodle_lock.release() fork_lock.release() def eat2(name): fork_lock.acquire() print(‘%s拿到叉子了‘%name) time.sleep(1) noodle_lock.acquire() print(‘%s拿到面條啦‘%name) print(‘%s吃面‘%name) noodle_lock.release() fork_lock.release() # 實例化2個鎖,給面條和叉子加上鎖 noodle_lock = Lock() fork_lock = Lock() Thread(target=eat1,args=(‘peo1‘,)).start() Thread(target=eat2,args=(‘peo2‘,)).start() Thread(target=eat1,args=(‘peo3‘,)).start() Thread(target=eat2,args=(‘peo4‘,)).start() """執行結果為: peo1拿到面條了 peo1拿到叉子了 peo1吃面 peo2拿到叉子了 peo3拿到面條了 “陷入阻塞”………… """多人吃面事例(使用互斥鎖):
# 加遞歸鎖 from threading import Thread,RLock import time def eat1(name): noodle_lock.acquire() # 一把鑰匙 print(‘%s拿到面條啦‘%name) fork_lock.acquire() print(‘%s拿到叉子了‘%name) print(‘%s吃面‘%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print(‘%s拿到叉子了‘%name) time.sleep(1) noodle_lock.acquire() print(‘%s拿到面條啦‘%name) print(‘%s吃面‘%name) noodle_lock.release() fork_lock.release() fork_lock = noodle_lock = RLock() Thread(target=eat1,args=(‘peo1‘,)).start() Thread(target=eat2,args=(‘peo2‘,)).start() Thread(target=eat1,args=(‘peo3‘,)).start() Thread(target=eat2,args=(‘peo4‘,)).start() # 通過加遞歸鎖實現了數據安全 # 遞歸鎖一般用於有多個數據需要加鎖的時候,不會出現數據安全問題多人吃面事例(使用遞歸鎖)
對於有多個數據需要加鎖的時候,用互斥鎖仍然會出現數據不安全問題
當多線程是對一個數據處理的時候通過給這個數據加上互斥鎖實現了數據安全。
但是當有多個數據被多線程調用處理的時候,加互斥鎖仍然會出現數據安全問題。這時候需要用到遞歸鎖,給多個數據加鎖。
7.線程信號量
# 線程信號量 # 相當於加一個鎖,但此鎖可以根據自己需要設置有多個鑰匙,此時可以允許有多個線程同時去做操作 from threading import Thread,Semaphore import time def func(sem,n): sem.acquire() time.sleep(0.5) print(n,end=‘ ‘) sem.release() sem = Semaphore(4) for i in range(10): t = Thread(target=func,args=(sem,i)) t.start() """輸出的結果為: 2 3 1 0 5 4 7 6 8 9 # (4個一組基本同時輸出) """
8.線程事件
# 事件被創建的時候 # False狀態 # wait() 阻塞 # True狀態 # wait() 非阻塞 # clear 設置狀態為False # set 設置狀態為True
# 起兩個線程 # 第一個線程 : 連接數據庫 # 等待一個信號 告訴我我們之間的網絡是通的 # 連接數據庫 # 第二個線程 : 檢測與數據庫之間的網絡是否連通 # time.sleep(0,2) 2 # 將事件的狀態設置為True from threading import Thread,Event import time,random def connect_db(e): count = 0 while count < 3: e.wait(0.5) # 狀態為False的時候,只等待0.5秒就結束 if e.is_set() == True: print(‘連接數據庫‘) break else: count += 1 print(‘第%s次連接失敗‘ %count) else: raise TimeoutError(‘數據庫連接超時‘) def check_web(e): time.sleep(random.randint(0, 3)) e.set() e = Event() t1 = Thread(target=connect_db, args=(e,)) t2 = Thread(target=check_web, args=(e,)) t1.start() t2.start()
9.線程條件
# notify(int數據類型) 造鑰匙 from threading import Thread from threading import Condition def func(con,i): con.acquire() con.wait() # 等鑰匙 print(‘在第%s個循環裏‘%i) con.release() con = Condition() for i in range(10): Thread(target=func,args = (con,i)).start() while True: num = int(input(‘>>>‘)) con.acquire() con.notify(num) # 造鑰匙 con.release()
輸出結果:
10.線程定時器
定時器,即指定n秒後執行某操作 import time from threading import Timer def func(): print(‘時間同步‘) #1-3 while True: time.sleep(1) t = Timer(5,func).start() # 非阻塞的 # 定時等待5秒之後就調用func # time.sleep(5)
# 自動更新驗證碼 from threading import Thread,Timer import random class Check_number: def __init__(self): self.cash_code() # 程序一開始便實例化一個驗證碼 def make_code(self,n=4): res = ‘‘ for i in range(n): s1 = str(random.randint(0,9)) # 0到9間的任意自然數 s2 = chr(random.randint(65,90)) # 24個小寫字母 res += random.choice([s1,s2]) # 字符和數字的任意組合 return res def cash_code(self,interval=3): self.code = self.make_code() # 實例化一個驗證碼 print(self.code) # 打印驗證碼 self.t = Timer(interval,self.make_code) # 定時器,等待指定時間再運行 self.t.start() def check(self): while True: mes = input(‘輸入驗證碼>>>:‘).strip() if self.code == mes.upper(): print(‘輸入正確!‘) self.t.cancel() # 關閉定時器 break obj = Check_number() obj.check()自動更新驗證碼
11.線程隊列
import queue q1 = queue.Queue() # 隊列 先進先出 q2 = queue.LifoQueue() # 棧 先進後出 q3 = queue.PriorityQueue() # 優先級隊列 q.put() # 往隊列中放入值,當隊列滿的時候陷入等待狀態,直到隊列可以放入值的時候放值 q.get() # 從隊列取值,當隊列為空的時候陷入等待狀態,直到隊列中有值的時候取值 q.put_nowait() # 往隊列中放入值,當隊列滿的時候,直接報錯 q.get_nowait() # 從隊列取值,當隊列為空的時候,直接報錯
優先級隊列事例:
優先級隊列事例: q = queue.PriorityQueue() # 優先級隊列 q.put((20,‘a‘)) q.put((10,‘b‘)) q.put((-5,‘d‘)) q.put((1,‘e‘)) q.put((1,‘f‘)) print(q.get()) print(q.get()) print(q.get()) print(q.get()) """執行結果為: (-5, ‘d‘) (1, ‘e‘) (1, ‘f‘) (10, ‘b‘) """ # 此隊列按照值的優先級的順序來取值,優先級數越小越先取值, # 當優先級一樣的時候,就比較值的ASCII值的大小,小的先取
12.線程池
import time from concurrent.futures import ThreadPoolExecutor def func(n): time.sleep(1) print(n,end=" ") return 2*n def call_back(m): # m接收的為func中return的值 print(‘結果是 %s‘%m.result()) # 從對象中獲取值的方法.result(),進程池中是.get() tpool = ThreadPoolExecutor(max_workers=5) # 默認 不要超過cpu個數*5 for i in range(10): tpool.submit(func,i).add_done_callback(call_back) """執行結果為: 4 結果是 8 2 結果是 4 3 結果是 6 1 結果是 2 0 結果是 0 9 結果是 18 8 結果是 16 6 結果是 12 5 結果是 10 7 結果是 14 """ # tpool.map(func,range(10)) # 拿不到返回值 t_lst = [] for i in range(10): t = tpool.submit(func,i) t_lst.append(t) tpool.shutdown() # shutdown相當於close+join print(‘主線程‘) for t in t_lst:print(‘*‘,t.result(),end=‘ ‘) """執行結果為: 4 2 3 1 0 9 8 7 6 5 主線程 * 0 * 2 * 4 * 6 * 8 * 10 * 12 * 14 * 16 * 18 """
13.多線程實現socket聊天
server 端
import socket from threading import Thread def chat(sk): conn, addr = sk.accept() conn.send("hello,我是服務端".encode(‘utf-8‘)) msg = conn.recv(1024).decode(‘utf-8‘) print(msg) conn.close() sk = socket.socket() sk.bind((‘127.0.0.1‘,8080)) sk.listen() while True: t = Thread(target=chat,args=(sk,)) t.start() sk.closeserver端
client 端
# client端 import socket sk = socket.socket() sk.connect((‘127.0.0.1‘,8080)) msg = sk.recv(1024).decode(‘utf-8‘) print(msg) info = input(‘>>>‘).encode(‘utf-8‘) sk.send(info) sk.close()client端
python之路 -- 並發編程之線程