1. 程式人生 > >Python之旅.第九章.並發編程

Python之旅.第九章.並發編程

導入 pid 線程理論 self. Go getname 一行代碼 ack 互斥

一、上節課復習

1 守護進程:如果父進程將子進程設置為守護進程,那麽在主進程代碼運行完畢後守護進程就立即被回收

2 互斥鎖:用來將並發編程串行,犧牲了效率而保證了數據安全

3 隊列:管道+

二、守護進程例子

解決:消費者取空列表後q.get()阻塞的問題

方法一:

from multiprocessing import Process

import time

def foo():

print(123)

time.sleep(1)

print("end123")

def bar():

print(456)

time.sleep(3)

print("end456")

if __name__ == ‘__main__‘:

p1=Process(target=foo)

p2=Process(target=bar)

p1.daemon=True #主進程代碼運行完畢,守護進程就會結束

p1.start()

p2.start()

print("main-------")

三、守護進程與應用

import time

import random

from multiprocessing import Process,Queue

def consumer(name,q):

while True:

res=q.get()

if res is None:break

time.sleep(random.randint(1,3))

print(‘\033[46m消費者===%s 吃了 %s\033[0m‘ %(name,res))

def producer(name,q,food):

for i in range(5):

time.sleep(random.randint(1,2))

res=‘%s%s‘ %(food,i)

q.put(res)

print(‘\033[45m生產者者===%s 生產了 %s\033[0m‘ %(name,res))

if __name__ == ‘__main__‘:

#1、共享的盆

q=Queue()

#2、生產者們

p1=Process(target=producer,args=(‘egon‘,q,‘包子‘))

p2=Process(target=producer,args=(‘劉清政‘,q,‘泔水‘))

p3=Process(target=producer,args=(‘楊軍‘,q,‘米飯‘))

#3、消費者們

c1=Process(target=consumer,args=(‘alex‘,q))

c2=Process(target=consumer,args=(‘梁書東‘,q))

p1.start()

p2.start()

p3.start()

c1.start()

c2.start()

# 在生產者生產完畢後,往隊列的末尾添加一個結束信號None

p1.join()

p2.join()

p3.join()

# 有幾個消費者就應該放幾個結束信號

q.put(None)

q.put(None)

方法二:

import time

import random

from multiprocessing import Process,JoinableQueue

def consumer(name,q):

while True:

res=q.get()

time.sleep(random.randint(1,3))

print(‘\033[46m消費者===%s 吃了 %s\033[0m‘ %(name,res))

q.task_done()

def producer(name,q,food):

for i in range(5):

time.sleep(random.randint(1,2))

res=‘%s%s‘ %(food,i)

q.put(res)

print(‘\033[45m生產者者===%s 生產了 %s\033[0m‘ %(name,res))

if __name__ == ‘__main__‘:

#1、共享的盆

q=JoinableQueue()

#2、生產者們

p1=Process(target=producer,args=(‘egon‘,q,‘包子‘))

p2=Process(target=producer,args=(‘劉清政‘,q,‘泔水‘))

p3=Process(target=producer,args=(‘楊軍‘,q,‘米飯‘))

#3、消費者們

c1=Process(target=consumer,args=(‘alex‘,q))

c2=Process(target=consumer,args=(‘梁書東‘,q))

c1.daemon=True # c1.daemon=True 必須在c1.start()

c2.daemon=True

p1.start()

p2.start()

p3.start()

c1.start()

c2.start()

# 確定生產者確確實實已經生產完畢

p1.join()

p2.join()

p3.join()

# 在生產者生產完畢後,拿到隊列中元素的總個數,然後直到元素總數變為0q.join()這一行代碼才算運行完畢

q.join()

#q.join()一旦結束意味著隊列確實被取空,消費者已經確確實實把數據都取幹凈了

print(‘主進程結束‘)

四、線程理論

1、什麽是線程

線程指的是一條流水線的工作過程

進程不是執行單位,是資源單位

一個進程內自帶一個線程,線程是執行單位

2、進程VS線程

1、同一進程內的線程們共享該進程內資源,不同進程內的線程資源肯定是隔離的

2、創建線程的開銷比創建進程要小的多

4 線程中沒有父子關系。相較於子線程、主線程特殊之處在於其代變了主進程的生命周期。

主進程等待子進程結束然後結束,是為子進程回收資源。

主線程等待子線程結束然後結束,是等待這個進程的代碼(其他非守護線程)執行完畢。

主進程:執行完代碼就結束。

主線程:所以子線程結束才結束。

五、開啟線程的兩種方式

方式一:導入Thread模塊

from threading import Thread

import time

def task(name):

print(‘%s is running‘ %name)

time.sleep(3)

if __name__ == ‘__main__‘:

t=Thread(target=task,args=(‘egon‘,))

t.start()

print(‘主線程‘)

方式二:創建類繼承Thread

from threading import Thread

import time

class MyThread(Thread):

def run(self):

print(‘%s is running‘ %self.name)

time.sleep(3)

if __name__ == ‘__main__‘:

t=MyThread()

t.start()

print(‘主線程‘)

六、進程vs線程

1、瞅一瞅PID Process ID

from threading import Thread

import time,os

def task():

print(‘%s is running‘ %os.getpid())

time.sleep(3)

if __name__ == ‘__main__‘:

t=Thread(target=task,)

t.start()

print(‘主線程‘,os.getpid()) #一個進程中的子線程pid相同

2、線程創建開銷小

3、同一進程內的多個線程共享該進程內的資源

from threading import Thread

import time,os

x=1000

def task():

global x

x=0

if __name__ == ‘__main__‘:

t=Thread(target=task,)

t.start()

t.join()

print(‘主線程‘,x) #主線程 0

七、線程對象的其他方法

from threading import Thread,current_thread,active_count,enumerate

import time,os

def task():

print(‘%s is running‘ %current_thread().name) #Thread-1 is running

time.sleep(3)

if __name__ == ‘__main__‘:

t1=Thread(target=task,name=‘第一個線程‘)

t2=Thread(target=task,)

t3=Thread(target=task,)

t1.start()

t2.start()

t3.start()

print(t1.is_alive()) #True

print(active_count()) #4

print(enumerate()) #[<_MainThread(MainThread, started 4320768832)>, <Thread(第一個線程, started 123145551912960)>, <Thread(Thread-1, started 123145557168128)>, <Thread(Thread-2, started 123145562423296)>] #當前活躍的線程

print(‘主線程‘,current_thread().name) #主線程 MainThread

八、守護線程

from threading import Thread,current_thread

import time

def task():

print(‘%s is running‘ %current_thread().name)

time.sleep(3)

if __name__ == ‘__main__‘:

t1=Thread(target=task,name=‘第一個線程‘)

t1.daemon = True

t1.start()

print(‘主線程‘)

from threading import Thread

import time

def foo():

print(123)

time.sleep(5)

print("end123")

def bar():

print(456)

time.sleep(3)

print("end456")

if __name__ == ‘__main__‘:

t1=Thread(target=foo)

t2=Thread(target=bar)

t1.daemon=True

t1.start()

t2.start()

print("main-------")

‘‘‘

123

456

main-------

end456

‘‘‘

主進程:執行完代碼就結束。

主線程:所以子線程結束才結束。

總結:只要進程內沒有可執行的代碼守護就結束

九、線程互斥鎖

from threading import Thread,Lock

import time

mutex=Lock()

x=100

def task():

global x

mutex.acquire()

temp=x

time.sleep(0.1)

x=temp-1

mutex.release()

if __name__ == ‘__main__‘:

start=time.time()

t_l=[]

for i in range(100):

t=Thread(target=task)

t_l.append(t)

t.start()

for t in t_l:

t.join()

print(‘‘,x) #0

print(time.time()-start)

十、死鎖現象與遞歸鎖

from threading import Thread,Lock,RLock

import time

# mutexA=Lock() #如果用Lock(互斥鎖),會發生死鎖現象

# mutexB=Lock()

mutexA=mutexB=RLock() #是一把鎖,可連續acqruie,但只有其上的計數為0時其他線程才可對其調用

class MyThread(Thread):

def run(self):

self.f1()

self.f2()

def f1(self):

mutexA.acquire()

print(‘%s 拿到了A‘ %self.name)

mutexB.acquire()

print(‘%s 拿到了B‘ %self.name)

mutexB.release()

mutexA.release()

def f2(self):

mutexB.acquire()

print(‘%s 拿到了B‘ %self.name)

time.sleep(0.1)

mutexA.acquire()

print(‘%s 拿到了A‘ %self.name)

mutexA.release()

mutexB.release()

if __name__ == ‘__main__‘:

for i in range(10):

t=MyThread()

t.start()

print(‘‘)

十一、信號量

# from multiprocessing import Semaphore #進程和線程中皆可導入Semaphore模塊

from threading import Thread,Semaphore,current_thread

import time,random

sm=Semaphore(5) #5把鑰匙,即同時可以5個對象進行執行

def go_wc():

sm.acquire()

print(‘%s 上廁所ing‘ %current_thread().getName())

time.sleep(random.randint(1,3))

sm.release()

if __name__ == ‘__main__‘:

for i in range(23):

t=Thread(target=go_wc)

t.start()

Python之旅.第九章.並發編程