並發編程 - 線程 - 1.互斥鎖/2.GIL解釋器鎖/3.死鎖與遞歸鎖/4.信號量/5.Event事件/6.定時器
阿新 • • 發佈:2018-04-04
級別 src 總結 alex post strip CQ bsp 回收機制
1.互斥鎖:
原理:將並行變成串行
精髓:局部串行,只針對共享數據修改
保護不同的數據就應該用不用的鎖
1 from threading import Thread, Lock 2 import time 3 4 n = 100 5 6 def task(): 7 global n 8 mutex.acquire() # 效率低了 但是數據安全了 9 temp = n 10 time.sleep(0.1) # 100個線程 都拿到了100 所以就是 100個線程100-1 11 n = temp - 1 12 mutex.release()13 14 15 if __name__ == ‘__main__‘: 16 mutex = Lock() 17 t_l = [] 18 for i in range(100): 19 t = Thread(target=task) 20 t_l.append(t) 21 t.start() 22 23 for t in t_l: 24 t.join() 25 26 print(‘主‘, n) 27 """ 28 主 99 原因: 100個線程 都拿到了100 所以就是 100個線程100-1 數據不安全 效率高但是不安全29 要將並行改為串行 30 """ 31 """ 32 主 0 原因:效率低了 但是數據安全了 33 """
2.GIL: global interpreter lock
python3 test.py
ps aux | grep test # linux
tasklist | findstr python # windows python.exe
運行python 會有幾步:
1.會有一個進程,進程內存空間 python解釋器的代碼先加載到內存空間
2.test.py 內容加載到內存
3.解釋執行;代碼交給了python解釋器
線程幹活指向了python代碼 python代碼當作參數傳給了解釋器
線程拿到解釋器的代碼,拿著python代碼當作參數,執行
垃圾回收線程運行解釋器的代碼
垃圾回收線程和某一個線程沖突了,數據不安全,
開多個進程,GIL就沒影響了, cpython解釋器垃圾回收線程定期啟動一個
GIL:互斥鎖,保證數據的安全 對CPython解釋器,同一時間只有一個線程運行
GIL.acquire() 這樣垃圾線程和線程就不會沖突了,這樣回收機制就變得安全了
GIL.release()
python解釋器,多線程有GIL存在,保證了一個進程下面多個線程的執行是一個一個執行的
有GIL與自動的鎖的工作原理:
總結:
1.GIL 一個進程內的多個線程同一時間只能運行一個線程,垃圾回收線程是安全的
2.針對不同的數據,就應該加不同的鎖,解釋器級別的GIL鎖,只能保護解釋器級別的數據,
不能保護自己的數據,針對自己的共享數據還要加鎖;
線程首先搶的是;GIL鎖,之後才是mutex
官網:
結論:在Cpython解釋器中,同一個進程下開啟的多線程,同一時刻只能有一個線程執行,無法利用多核優勢
GIL的存在:同一時刻,只能有一個線程在運行
多核,多進程,但進程開銷大,多線程,又不能用多核 ?
cpu幹計算的,多個cpu
1.如果是幹計算的操作,多核省時間
2.如果幹IO阻塞型操作,多核沒用
程序運行:都會幹計算和IO操作
四個任務:
1.1個核:開多線程 ,因為多進程能用上多核
2.多核:
計算密集型:用多進程,用多核,eg:金融行業的,計算比較多,雖然多進程開銷大,但多核,保證了計算快
IO密集型:用多線程,同一時間只能用一個核,1個核一個進程,多線程就在一個核上來回切和四個核來回切是一樣的
現在寫的軟件:
網絡打交道,網絡的IO
IO密集型,用多線程
1 """ 2 計算密集型應該用: 多進程 效率高 3 """ 4 from multiprocessing import Process 5 from threading import Thread 6 import os,time 7 8 def work(): 9 res=0 10 for i in range(100000000): 11 res*=i 12 13 14 if __name__ == ‘__main__‘: 15 l=[] 16 print(os.cpu_count()) #本機為8核 17 start=time.time() 18 for i in range(8): 19 # p=Process(target=work) #耗時8s多 20 p=Thread(target=work) #耗時37s多 21 l.append(p) 22 p.start() 23 for p in l: 24 p.join() 25 stop=time.time() 26 print(‘run time is %s‘ %(stop-start)) 27 28 """ 29 IO密集型:多線程 效率高 30 """ 31 from multiprocessing import Process 32 from threading import Thread 33 import threading 34 import os,time 35 def work(): 36 time.sleep(2) 37 print(‘===>‘) 38 39 if __name__ == ‘__main__‘: 40 l=[] 41 print(os.cpu_count()) #本機為8核 42 start=time.time() 43 for i in range(400): 44 # p=Process(target=work) #耗時8s多,大部分時間耗費在創建進程上 45 p=Thread(target=work) #耗時2s多 46 l.append(p) 47 p.start() 48 for p in l: 49 p.join() 50 stop=time.time() 51 print(‘run time is %s‘ %(stop-start))
3.死鎖:
你拿著我的鎖,我拿著你的鎖
互斥鎖:Lock()
互斥鎖只能acquire一次
遞歸鎖:RLock()
可以連續acquire多次,每acquire一次計數器+1,
只有計數為0時,才能被搶到acquire
1 from threading import Thread,Lock 2 import time 3 4 mutexA=Lock() 5 mutexB=Lock() 6 7 class MyThread(Thread): 8 def run(self): 9 self.f1() 10 self.f2() 11 12 def f1(self): 13 mutexA.acquire() 14 print(‘%s 拿到了A鎖‘ %self.name) 15 16 mutexB.acquire() 17 print(‘%s 拿到了B鎖‘ %self.name) 18 mutexB.release() 19 20 mutexA.release() 21 22 23 def f2(self): 24 mutexB.acquire() 25 print(‘%s 拿到了B鎖‘ % self.name) 26 time.sleep(0.1) 27 28 mutexA.acquire() 29 print(‘%s 拿到了A鎖‘ % self.name) 30 mutexA.release() 31 32 mutexB.release() 33 34 if __name__ == ‘__main__‘: 35 for i in range(10): 36 t=MyThread() 37 t.start() 38 """ 39 Thread-1 拿到了A鎖 # 死鎖了 卡住了 40 Thread-1 拿到了B鎖 41 Thread-1 拿到了B鎖 42 Thread-2 拿到了A鎖 43 """
1 # 互斥鎖只能acquire一次 2 # from threading import Thread,Lock 3 # 4 # mutexA=Lock() 5 # 6 # mutexA.acquire() 7 # mutexA.release() 8 9 # 遞歸鎖:可以連續acquire多次,每acquire一次計數器+1,只有計數為0時,才能被搶到acquire 10 from threading import Thread,RLock 11 import time 12 13 mutexB=mutexA=RLock() 14 15 class MyThread(Thread): 16 def run(self): 17 self.f1() 18 self.f2() 19 20 def f1(self): 21 mutexA.acquire() 22 print(‘%s 拿到了A鎖‘ %self.name) 23 24 mutexB.acquire() 25 print(‘%s 拿到了B鎖‘ %self.name) 26 mutexB.release() 27 28 mutexA.release() 29 30 31 def f2(self): 32 mutexB.acquire() 33 print(‘%s 拿到了B鎖‘ % self.name) 34 time.sleep(2) 35 36 mutexA.acquire() 37 print(‘%s 拿到了A鎖‘ % self.name) 38 mutexA.release() 39 40 mutexB.release() 41 42 if __name__ == ‘__main__‘: 43 for i in range(10): 44 t=MyThread() 45 t.start() 46 """ 47 Thread-1 拿到了A鎖 # 解決了 死鎖 48 Thread-1 拿到了B鎖 49 Thread-1 拿到了B鎖 50 Thread-1 拿到了A鎖 51 Thread-2 拿到了A鎖 52 Thread-2 拿到了B鎖 53 Thread-2 拿到了B鎖 54 Thread-2 拿到了A鎖 55 Thread-4 拿到了A鎖 56 Thread-4 拿到了B鎖 57 Thread-5 拿到了A鎖 58 Thread-5 拿到了B鎖 59 Thread-5 拿到了B鎖 60 Thread-5 拿到了A鎖 61 Thread-7 拿到了A鎖 62 Thread-7 拿到了B鎖 63 Thread-7 拿到了B鎖 64 Thread-7 拿到了A鎖 65 Thread-9 拿到了A鎖 66 Thread-9 拿到了B鎖 67 Thread-9 拿到了B鎖 68 Thread-9 拿到了A鎖 69 Thread-3 拿到了A鎖 70 Thread-3 拿到了B鎖 71 Thread-3 拿到了B鎖 72 Thread-3 拿到了A鎖 73 Thread-6 拿到了A鎖 74 Thread-6 拿到了B鎖 75 Thread-6 拿到了B鎖 76 Thread-6 拿到了A鎖 77 Thread-10 拿到了A鎖 78 Thread-10 拿到了B鎖 79 Thread-10 拿到了B鎖 80 Thread-10 拿到了A鎖 81 Thread-8 拿到了A鎖 82 Thread-8 拿到了B鎖 83 Thread-8 拿到了B鎖 84 Thread-8 拿到了A鎖 85 Thread-4 拿到了B鎖 86 Thread-4 拿到了A鎖 87 """
4.信號量
信號量也是一把鎖,可以指定信號量為5,對比互斥鎖同一時間只能有一個任務搶到鎖去執行,
信號量同一時間可以有5個任務拿到鎖去執行
信號量:同一時間有多個線程在進行
1 from threading import Thread,Semaphore,currentThread 2 import time,random 3 4 sm=Semaphore(1) 5 6 def task(): 7 # sm.acquire() 8 # print(‘%s in‘ %currentThread().getName()) 9 # sm.release() 10 with sm: # 類似於sm.acquire() # 同一時間可以來3個人,1個人,或者2個人 11 print(‘%s in‘ %currentThread().getName()) 12 time.sleep(random.randint(1,3)) 13 14 15 if __name__ == ‘__main__‘: 16 for i in range(10): 17 t=Thread(target=task) 18 t.start() 19 """ 20 Thread-1 in 21 Thread-2 in 22 Thread-3 in 23 24 Thread-4 in 25 26 27 Thread-6 in 28 Thread-5 in 29 Thread-7 in 30 31 32 Thread-8 in 33 Thread-9 in 34 35 Thread-10 in 36 """
5.Event:
多個線程之間同步的,一個線程告訴另一些線程可以做其他的活了
event.wait()
event.wait(2)
event.set()
event.is_set()
event.clear()
1 from threading import Thread,Event 2 import time 3 4 event=Event() 5 # event.wait() # 等 ...直到 set 6 # event.set() 7 8 9 def student(name): 10 print(‘學生%s 正在聽課‘ %name) 11 # event.wait() # 學生要等7秒 才能下課 12 event.wait(2) # 學生等2秒 直接下課了 13 14 print(‘學生%s 課間活動‘ %name) 15 16 17 def teacher(name): 18 print(‘老師%s 正在授課‘ %name) 19 time.sleep(7) 20 event.set() 21 22 23 if __name__ == ‘__main__‘: 24 stu1=Thread(target=student,args=(‘alex‘,)) 25 stu2=Thread(target=student,args=(‘wxx‘,)) 26 stu3=Thread(target=student,args=(‘yxx‘,)) 27 t1=Thread(target=teacher,args=(‘egon‘,)) 28 29 stu1.start() 30 stu2.start() 31 stu3.start() 32 t1.start() 33 34 35 # ------------------ 36 # 設置鏈接的超時時間 37 from threading import Thread,Event,currentThread 38 import time 39 40 event=Event() 41 42 def conn(): 43 # print(‘%s is connecting‘%currentThread().getName()) 44 # event.wait() 45 # print(‘%s is connected‘%currentThread().getName()) 46 47 n=0 48 while not event.is_set(): 49 if n == 3: 50 print(‘%s try too many times‘ %currentThread().getName()) 51 return 52 print(‘%s try %s‘ %(currentThread().getName(),n)) 53 event.wait(0.5) 54 n+=1 55 56 print(‘%s is connected‘ %currentThread().getName()) 57 58 59 def check(): 60 print(‘%s is checking‘ %currentThread().getName()) 61 time.sleep(5) 62 event.set() 63 64 65 if __name__ == ‘__main__‘: 66 for i in range(3): 67 t=Thread(target=conn) 68 t.start() 69 t=Thread(target=check) 70 t.start() 71 """ 72 Thread-1 try 0 73 Thread-2 try 0 74 Thread-3 try 0 75 Thread-4 is checking 76 Thread-3 try 1 77 Thread-2 try 1 78 Thread-1 try 1 79 Thread-3 try 2 80 Thread-1 try 2 81 Thread-2 try 2 82 Thread-3 try too many times 83 Thread-2 try too many times 84 Thread-1 try too many times 85 """
6.定時器:Timer
t=Timer(5,task,args=(‘egon‘,))
t.start()
t.cancel()
1 from threading import Timer 2 3 def task(name): 4 print(‘hello %s‘ %name) 5 6 t=Timer(5,task,args=(‘egon‘,)) # 就是起了一個線程 7 t.start() 8 9 # ---------------------- 10 from threading import Timer 11 import random 12 13 class Code: 14 def __init__(self): 15 self.make_cache() 16 17 def make_cache(self,interval=10): 18 self.cache=self.make_code() 19 print(self.cache) 20 self.t=Timer(interval,self.make_cache) 21 self.t.start() 22 23 def make_code(self,n=4): 24 res=‘‘ 25 for i in range(n): 26 s1=str(random.randint(0,9)) 27 s2=chr(random.randint(65,90)) 28 res+=random.choice([s1,s2]) 29 return res 30 31 def check(self): 32 while True: 33 code=input(‘請輸入你的驗證碼>>: ‘).strip() 34 if code.upper() == self.cache: 35 print(‘驗證碼輸入正確‘) 36 self.t.cancel() 37 break 38 39 obj=Code() 40 obj.check()
並發編程 - 線程 - 1.互斥鎖/2.GIL解釋器鎖/3.死鎖與遞歸鎖/4.信號量/5.Event事件/6.定時器