Python之旅.第九章.並發編程
一、上節課復習
1、 守護進程:如果父進程將子進程設置為守護進程,那麽在主進程代碼運行完畢後守護進程就立即被回收
2、 互斥鎖:用來將並發編程串行,犧牲了效率而保證了數據安全
3、 隊列:管道+鎖
二、守護進程例子
解決:消費者取空列表後q.get()阻塞的問題
方法一:
from multiprocessing import Process
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
if __name__ == ‘__main__‘:
p1=Process(target=foo)
p2=Process(target=bar)
p1.daemon=True #主進程代碼運行完畢,守護進程就會結束
p1.start()
p2.start()
print("main-------")
三、守護進程與應用
import time
import random
from multiprocessing import Process,Queue
def consumer(name,q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print(‘\033[46m消費者===》%s 吃了 %s\033[0m‘ %(name,res))
def producer(name,q,food):
for i in range(5):
time.sleep(random.randint(1,2))
res=‘%s%s‘ %(food,i)
q.put(res)
print(‘\033[45m生產者者===》%s 生產了 %s\033[0m‘ %(name,res))
if __name__ == ‘__main__‘:
#1、共享的盆
q=Queue()
#2、生產者們
p1=Process(target=producer,args=(‘egon‘,q,‘包子‘))
p2=Process(target=producer,args=(‘劉清政‘,q,‘泔水‘))
p3=Process(target=producer,args=(‘楊軍‘,q,‘米飯‘))
#3、消費者們
c1=Process(target=consumer,args=(‘alex‘,q))
c2=Process(target=consumer,args=(‘梁書東‘,q))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
# 在生產者生產完畢後,往隊列的末尾添加一個結束信號None
p1.join()
p2.join()
p3.join()
# 有幾個消費者就應該放幾個結束信號
q.put(None)
q.put(None)
方法二:
import time
import random
from multiprocessing import Process,JoinableQueue
def consumer(name,q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print(‘\033[46m消費者===》%s 吃了 %s\033[0m‘ %(name,res))
q.task_done()
def producer(name,q,food):
for i in range(5):
time.sleep(random.randint(1,2))
res=‘%s%s‘ %(food,i)
q.put(res)
print(‘\033[45m生產者者===》%s 生產了 %s\033[0m‘ %(name,res))
if __name__ == ‘__main__‘:
#1、共享的盆
q=JoinableQueue()
#2、生產者們
p1=Process(target=producer,args=(‘egon‘,q,‘包子‘))
p2=Process(target=producer,args=(‘劉清政‘,q,‘泔水‘))
p3=Process(target=producer,args=(‘楊軍‘,q,‘米飯‘))
#3、消費者們
c1=Process(target=consumer,args=(‘alex‘,q))
c2=Process(target=consumer,args=(‘梁書東‘,q))
c1.daemon=True # c1.daemon=True 必須在c1.start() 前
c2.daemon=True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
# 確定生產者確確實實已經生產完畢
p1.join()
p2.join()
p3.join()
# 在生產者生產完畢後,拿到隊列中元素的總個數,然後直到元素總數變為0,q.join()這一行代碼才算運行完畢
q.join()
#q.join()一旦結束意味著隊列確實被取空,消費者已經確確實實把數據都取幹凈了
print(‘主進程結束‘)
四、線程理論
1、什麽是線程
線程指的是一條流水線的工作過程
進程不是執行單位,是資源單位
一個進程內自帶一個線程,線程是執行單位
2、進程VS線程
1、同一進程內的線程們共享該進程內資源,不同進程內的線程資源肯定是隔離的
2、創建線程的開銷比創建進程要小的多
4、 線程中沒有父子關系。相較於子線程、主線程特殊之處在於其代變了主進程的生命周期。
主進程等待子進程結束然後結束,是為子進程回收資源。
主線程等待子線程結束然後結束,是等待這個進程的代碼(其他非守護線程)執行完畢。
主進程:執行完代碼就結束。
主線程:所以子線程結束才結束。
五、開啟線程的兩種方式
方式一:導入Thread模塊
from threading import Thread
import time
def task(name):
print(‘%s is running‘ %name)
time.sleep(3)
if __name__ == ‘__main__‘:
t=Thread(target=task,args=(‘egon‘,))
t.start()
print(‘主線程‘)
方式二:創建類繼承Thread
from threading import Thread
import time
class MyThread(Thread):
def run(self):
print(‘%s is running‘ %self.name)
time.sleep(3)
if __name__ == ‘__main__‘:
t=MyThread()
t.start()
print(‘主線程‘)
六、進程vs線程
1、瞅一瞅PID (Process ID)
from threading import Thread
import time,os
def task():
print(‘%s is running‘ %os.getpid())
time.sleep(3)
if __name__ == ‘__main__‘:
t=Thread(target=task,)
t.start()
print(‘主線程‘,os.getpid()) #一個進程中的子線程pid相同
2、線程創建開銷小
3、同一進程內的多個線程共享該進程內的資源
from threading import Thread
import time,os
x=1000
def task():
global x
x=0
if __name__ == ‘__main__‘:
t=Thread(target=task,)
t.start()
t.join()
print(‘主線程‘,x) #主線程 0
七、線程對象的其他方法
from threading import Thread,current_thread,active_count,enumerate
import time,os
def task():
print(‘%s is running‘ %current_thread().name) #Thread-1 is running
time.sleep(3)
if __name__ == ‘__main__‘:
t1=Thread(target=task,name=‘第一個線程‘)
t2=Thread(target=task,)
t3=Thread(target=task,)
t1.start()
t2.start()
t3.start()
print(t1.is_alive()) #True
print(active_count()) #4
print(enumerate()) #[<_MainThread(MainThread, started 4320768832)>, <Thread(第一個線程, started 123145551912960)>, <Thread(Thread-1, started 123145557168128)>, <Thread(Thread-2, started 123145562423296)>] #當前活躍的線程
print(‘主線程‘,current_thread().name) #主線程 MainThread
八、守護線程
from threading import Thread,current_thread
import time
def task():
print(‘%s is running‘ %current_thread().name)
time.sleep(3)
if __name__ == ‘__main__‘:
t1=Thread(target=task,name=‘第一個線程‘)
t1.daemon = True
t1.start()
print(‘主線程‘)
from threading import Thread
import time
def foo():
print(123)
time.sleep(5)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
if __name__ == ‘__main__‘:
t1=Thread(target=foo)
t2=Thread(target=bar)
t1.daemon=True
t1.start()
t2.start()
print("main-------")
‘‘‘
123
456
main-------
end456
‘‘‘
主進程:執行完代碼就結束。
主線程:所以子線程結束才結束。
總結:只要進程內沒有可執行的代碼守護就結束
九、線程互斥鎖
from threading import Thread,Lock
import time
mutex=Lock()
x=100
def task():
global x
mutex.acquire()
temp=x
time.sleep(0.1)
x=temp-1
mutex.release()
if __name__ == ‘__main__‘:
start=time.time()
t_l=[]
for i in range(100):
t=Thread(target=task)
t_l.append(t)
t.start()
for t in t_l:
t.join()
print(‘主‘,x) #0
print(time.time()-start)
十、死鎖現象與遞歸鎖
from threading import Thread,Lock,RLock
import time
# mutexA=Lock() #如果用Lock(互斥鎖),會發生死鎖現象
# mutexB=Lock()
mutexA=mutexB=RLock() #是一把鎖,可連續acqruie,但只有其上的計數為0時其他線程才可對其調用
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print(‘%s 拿到了A鎖‘ %self.name)
mutexB.acquire()
print(‘%s 拿到了B鎖‘ %self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print(‘%s 拿到了B鎖‘ %self.name)
time.sleep(0.1)
mutexA.acquire()
print(‘%s 拿到了A鎖‘ %self.name)
mutexA.release()
mutexB.release()
if __name__ == ‘__main__‘:
for i in range(10):
t=MyThread()
t.start()
print(‘主‘)
十一、信號量
# from multiprocessing import Semaphore #進程和線程中皆可導入Semaphore模塊
from threading import Thread,Semaphore,current_thread
import time,random
sm=Semaphore(5) #5把鑰匙,即同時可以5個對象進行執行
def go_wc():
sm.acquire()
print(‘%s 上廁所ing‘ %current_thread().getName())
time.sleep(random.randint(1,3))
sm.release()
if __name__ == ‘__main__‘:
for i in range(23):
t=Thread(target=go_wc)
t.start()
Python之旅.第九章.並發編程