1. 程式人生 > >並發編程 - 線程 - 1.互斥鎖/2.GIL解釋器鎖/3.死鎖與遞歸鎖/4.信號量/5.Event事件/6.定時器

並發編程 - 線程 - 1.互斥鎖/2.GIL解釋器鎖/3.死鎖與遞歸鎖/4.信號量/5.Event事件/6.定時器

級別 src 總結 alex post strip CQ bsp 回收機制

1.互斥鎖:
原理:將並行變成串行
精髓:局部串行,只針對共享數據修改
保護不同的數據就應該用不用的鎖
 1 from threading import Thread, Lock
 2 import time
 3 
 4 n = 100
 5 
 6 def task():
 7     global n
 8     mutex.acquire()  # 效率低了 但是數據安全了
 9     temp = n
10     time.sleep(0.1)  # 100個線程 都拿到了100  所以就是 100個線程100-1
11     n = temp - 1
12     mutex.release()
13 14 15 if __name__ == __main__: 16 mutex = Lock() 17 t_l = [] 18 for i in range(100): 19 t = Thread(target=task) 20 t_l.append(t) 21 t.start() 22 23 for t in t_l: 24 t.join() 25 26 print(, n) 27 """ 28 主 99 原因: 100個線程 都拿到了100 所以就是 100個線程100-1 數據不安全 效率高但是不安全
29 要將並行改為串行 30 """ 31 """ 32 主 0 原因:效率低了 但是數據安全了 33 """
2.GIL: global interpreter lock
python3 test.py
ps aux | grep test # linux
tasklist | findstr python # windows python.exe

運行python 會有幾步:
1.會有一個進程,進程內存空間 python解釋器的代碼先加載到內存空間
2.test.py 內容加載到內存
3.解釋執行;代碼交給了python解釋器
線程幹活指向了python代碼 python代碼當作參數傳給了解釋器
線程拿到解釋器的代碼,拿著python代碼當作參數,執行
垃圾回收線程運行解釋器的代碼
垃圾回收線程和某一個線程沖突了,數據不安全,

技術分享圖片


開多個進程,GIL就沒影響了, cpython解釋器垃圾回收線程定期啟動一個
GIL:互斥鎖,保證數據的安全 對CPython解釋器,同一時間只有一個線程運行
GIL.acquire() 這樣垃圾線程和線程就不會沖突了,這樣回收機制就變得安全了
GIL.release()
python解釋器,多線程有GIL存在,保證了一個進程下面多個線程的執行是一個一個執行的

有GIL與自動的鎖的工作原理:

技術分享圖片


總結:
1.GIL 一個進程內的多個線程同一時間只能運行一個線程,垃圾回收線程是安全的
2.針對不同的數據,就應該加不同的鎖,解釋器級別的GIL鎖,只能保護解釋器級別的數據,
不能保護自己的數據,針對自己的共享數據還要加鎖;

線程首先搶的是;GIL鎖,之後才是mutex
官網:
結論:在Cpython解釋器中,同一個進程下開啟的多線程,同一時刻只能有一個線程執行,無法利用多核優勢

GIL的存在:同一時刻,只能有一個線程在運行
多核,多進程,但進程開銷大,多線程,又不能用多核 ?

cpu幹計算的,多個cpu
1.如果是幹計算的操作,多核省時間
2.如果幹IO阻塞型操作,多核沒用

程序運行:都會幹計算和IO操作

四個任務:
1.1個核:開多線程 ,因為多進程能用上多核
2.多核:
計算密集型:用多進程,用多核,eg:金融行業的,計算比較多,雖然多進程開銷大,但多核,保證了計算快
IO密集型:用多線程,同一時間只能用一個核,1個核一個進程,多線程就在一個核上來回切和四個核來回切是一樣的

現在寫的軟件:
網絡打交道,網絡的IO
IO密集型,用多線程
 1 """
 2 計算密集型應該用: 多進程 效率高  
 3 """
 4 from multiprocessing import Process
 5 from threading import Thread
 6 import os,time
 7 
 8 def work():
 9     res=0
10     for i in range(100000000):
11         res*=i
12 
13 
14 if __name__ == __main__:
15     l=[]
16     print(os.cpu_count()) #本機為8核
17     start=time.time()
18     for i in range(8):
19         # p=Process(target=work) #耗時8s多
20         p=Thread(target=work) #耗時37s多
21         l.append(p)
22         p.start()
23     for p in l:
24         p.join()
25     stop=time.time()
26     print(run time is %s %(stop-start))
27 
28 """
29 IO密集型:多線程 效率高
30 """
31 from multiprocessing import Process
32 from threading import Thread
33 import threading
34 import os,time
35 def work():
36     time.sleep(2)
37     print(===>)
38 
39 if __name__ == __main__:
40     l=[]
41     print(os.cpu_count()) #本機為8核
42     start=time.time()
43     for i in range(400):
44         # p=Process(target=work) #耗時8s多,大部分時間耗費在創建進程上
45         p=Thread(target=work) #耗時2s多
46         l.append(p)
47         p.start()
48     for p in l:
49         p.join()
50     stop=time.time()
51     print(run time is %s %(stop-start))
3.死鎖:
你拿著我的鎖,我拿著你的鎖

互斥鎖:Lock()
互斥鎖只能acquire一次

遞歸鎖:RLock()
可以連續acquire多次,每acquire一次計數器+1,
只有計數為0時,才能被搶到acquire
 1 from threading import Thread,Lock
 2 import time
 3 
 4 mutexA=Lock()
 5 mutexB=Lock()
 6 
 7 class MyThread(Thread):
 8     def run(self):
 9         self.f1()
10         self.f2()
11 
12     def f1(self):
13         mutexA.acquire()
14         print(%s 拿到了A鎖 %self.name)
15 
16         mutexB.acquire()
17         print(%s 拿到了B鎖 %self.name)
18         mutexB.release()
19 
20         mutexA.release()
21 
22 
23     def f2(self):
24         mutexB.acquire()
25         print(%s 拿到了B鎖 % self.name)
26         time.sleep(0.1)
27 
28         mutexA.acquire()
29         print(%s 拿到了A鎖 % self.name)
30         mutexA.release()
31 
32         mutexB.release()
33 
34 if __name__ == __main__:
35     for i in range(10):
36         t=MyThread()
37         t.start()
38 """
39 Thread-1 拿到了A鎖  # 死鎖了 卡住了
40 Thread-1 拿到了B鎖
41 Thread-1 拿到了B鎖
42 Thread-2 拿到了A鎖
43 """
 1 # 互斥鎖只能acquire一次
 2 # from threading import Thread,Lock
 3 #
 4 # mutexA=Lock()
 5 #
 6 # mutexA.acquire()
 7 # mutexA.release()
 8 
 9 # 遞歸鎖:可以連續acquire多次,每acquire一次計數器+1,只有計數為0時,才能被搶到acquire
10 from threading import Thread,RLock
11 import time
12 
13 mutexB=mutexA=RLock()
14 
15 class MyThread(Thread):
16     def run(self):
17         self.f1()
18         self.f2()
19 
20     def f1(self):
21         mutexA.acquire()
22         print(%s 拿到了A鎖 %self.name)
23 
24         mutexB.acquire()
25         print(%s 拿到了B鎖 %self.name)
26         mutexB.release()
27 
28         mutexA.release()
29 
30 
31     def f2(self):
32         mutexB.acquire()
33         print(%s 拿到了B鎖 % self.name)
34         time.sleep(2)
35 
36         mutexA.acquire()
37         print(%s 拿到了A鎖 % self.name)
38         mutexA.release()
39 
40         mutexB.release()
41 
42 if __name__ == __main__:
43     for i in range(10):
44         t=MyThread()
45         t.start()
46 """
47 Thread-1 拿到了A鎖  # 解決了 死鎖
48 Thread-1 拿到了B鎖
49 Thread-1 拿到了B鎖
50 Thread-1 拿到了A鎖
51 Thread-2 拿到了A鎖
52 Thread-2 拿到了B鎖
53 Thread-2 拿到了B鎖
54 Thread-2 拿到了A鎖
55 Thread-4 拿到了A鎖
56 Thread-4 拿到了B鎖
57 Thread-5 拿到了A鎖
58 Thread-5 拿到了B鎖
59 Thread-5 拿到了B鎖
60 Thread-5 拿到了A鎖
61 Thread-7 拿到了A鎖
62 Thread-7 拿到了B鎖
63 Thread-7 拿到了B鎖
64 Thread-7 拿到了A鎖
65 Thread-9 拿到了A鎖
66 Thread-9 拿到了B鎖
67 Thread-9 拿到了B鎖
68 Thread-9 拿到了A鎖
69 Thread-3 拿到了A鎖
70 Thread-3 拿到了B鎖
71 Thread-3 拿到了B鎖
72 Thread-3 拿到了A鎖
73 Thread-6 拿到了A鎖
74 Thread-6 拿到了B鎖
75 Thread-6 拿到了B鎖
76 Thread-6 拿到了A鎖
77 Thread-10 拿到了A鎖
78 Thread-10 拿到了B鎖
79 Thread-10 拿到了B鎖
80 Thread-10 拿到了A鎖
81 Thread-8 拿到了A鎖
82 Thread-8 拿到了B鎖
83 Thread-8 拿到了B鎖
84 Thread-8 拿到了A鎖
85 Thread-4 拿到了B鎖
86 Thread-4 拿到了A鎖
87 """
4.信號量
  信號量也是一把鎖,可以指定信號量為5,對比互斥鎖同一時間只能有一個任務搶到鎖去執行,
   信號量同一時間可以有5個任務拿到鎖去執行
  信號量:同一時間有多個線程在進行
 1 from threading import Thread,Semaphore,currentThread
 2 import time,random
 3 
 4 sm=Semaphore(1)
 5 
 6 def task():
 7     # sm.acquire()
 8     # print(‘%s in‘ %currentThread().getName())
 9     # sm.release()
10     with sm:  # 類似於sm.acquire() # 同一時間可以來3個人,1個人,或者2個人
11         print(%s in %currentThread().getName())
12         time.sleep(random.randint(1,3))
13 
14 
15 if __name__ == __main__:
16     for i in range(10):
17         t=Thread(target=task)
18         t.start()
19 """
20 Thread-1 in
21 Thread-2 in
22 Thread-3 in
23 
24 Thread-4 in
25 
26 
27 Thread-6 in
28 Thread-5 in
29 Thread-7 in
30 
31 
32 Thread-8 in
33 Thread-9 in
34 
35 Thread-10 in 
36 """
5.Event:
多個線程之間同步的,一個線程告訴另一些線程可以做其他的活了
event.wait()
event.wait(2)
event.set()
event.is_set()
event.clear()
 1 from threading import Thread,Event
 2 import time
 3 
 4 event=Event()
 5 # event.wait()   # 等 ...直到 set
 6 # event.set()
 7 
 8 
 9 def student(name):
10     print(學生%s 正在聽課 %name)
11     # event.wait()  # 學生要等7秒 才能下課
12     event.wait(2)   # 學生等2秒 直接下課了
13 
14     print(學生%s 課間活動 %name)
15 
16 
17 def teacher(name):
18     print(老師%s 正在授課 %name)
19     time.sleep(7)
20     event.set()
21 
22 
23 if __name__ == __main__:
24     stu1=Thread(target=student,args=(alex,))
25     stu2=Thread(target=student,args=(wxx,))
26     stu3=Thread(target=student,args=(yxx,))
27     t1=Thread(target=teacher,args=(egon,))
28 
29     stu1.start()
30     stu2.start()
31     stu3.start()
32     t1.start()
33 
34 
35 # ------------------
36 # 設置鏈接的超時時間
37 from threading import Thread,Event,currentThread
38 import time
39 
40 event=Event()
41 
42 def conn():
43     # print(‘%s is connecting‘%currentThread().getName())
44     # event.wait()
45     # print(‘%s is connected‘%currentThread().getName())
46 
47     n=0
48     while not event.is_set():
49         if n == 3:
50             print(%s try too many times %currentThread().getName())
51             return
52         print(%s try %s %(currentThread().getName(),n))
53         event.wait(0.5)
54         n+=1
55 
56     print(%s is connected %currentThread().getName())
57 
58 
59 def check():
60     print(%s is checking %currentThread().getName())
61     time.sleep(5)
62     event.set()
63 
64 
65 if __name__ == __main__:
66     for i in range(3):
67         t=Thread(target=conn)
68         t.start()
69     t=Thread(target=check)
70     t.start()
71 """
72 Thread-1 try 0
73 Thread-2 try 0
74 Thread-3 try 0
75 Thread-4 is checking
76 Thread-3 try 1
77 Thread-2 try 1
78 Thread-1 try 1
79 Thread-3 try 2
80 Thread-1 try 2
81 Thread-2 try 2
82 Thread-3 try too many times
83 Thread-2 try too many times
84 Thread-1 try too many times
85 """
6.定時器:Timer
  t=Timer(5,task,args=(‘egon‘,))
  t.start()
  t.cancel()
 1 from threading import Timer
 2 
 3 def task(name):
 4     print(hello %s %name)
 5 
 6 t=Timer(5,task,args=(egon,))  # 就是起了一個線程
 7 t.start()
 8 
 9 # ----------------------
10 from threading import Timer
11 import random
12 
13 class Code:
14     def __init__(self):
15         self.make_cache()
16 
17     def make_cache(self,interval=10):
18         self.cache=self.make_code()
19         print(self.cache)
20         self.t=Timer(interval,self.make_cache)
21         self.t.start()
22 
23     def make_code(self,n=4):
24         res=‘‘
25         for i in range(n):
26             s1=str(random.randint(0,9))
27             s2=chr(random.randint(65,90))
28             res+=random.choice([s1,s2])
29         return res
30 
31     def check(self):
32         while True:
33             code=input(請輸入你的驗證碼>>: ).strip()
34             if code.upper() == self.cache:
35                 print(驗證碼輸入正確)
36                 self.t.cancel()
37                 break
38 
39 obj=Code()
40 obj.check()




並發編程 - 線程 - 1.互斥鎖/2.GIL解釋器鎖/3.死鎖與遞歸鎖/4.信號量/5.Event事件/6.定時器