python網路程式設計--程序的方法和通訊,鎖, 佇列,生產者消費者模型
阿新 • • 發佈:2019-01-10
1.程序的其他方法
程序:正在進行的一個過程或者說一個任務.負責執行任務的是cpu,程序之間的空間是相互隔離的
使用multiprocess模組來開啟程序
Process([group [, target[, name [, args [,kwargs]]]]])由該類例項化的物件,可用來開啟一個子程序
開啟程序:
from multiprocess import Process
p = Process(target=方法名)
引數介紹:
group未使用,值始終為None target表示呼叫物件,即子程序要執行的任務 args表示呼叫物件的位置引數元組, args(1,) kwargs表示呼叫物件的字典,kwargs = {'name':'bob'} name為子程序的名稱
方法:
p.start()開啟程序
p.run()程序啟動時執行的方法,正是它去呼叫target指定的函式,我們自定義類的類中一定要實現該方法
p.terminate()強制終止程序p,不會進行任何清理操作,如果p建立了子程序,該子程序就成了殭屍程序,使用該方法要小心這種情況.如果p還儲存了一個鎖那麼也將不會釋放,進而導致死鎖
屬性:
p.daemon守護程序
p.name程序的名稱
p.pid 程序的pid 也使用 os.getpid來檢視 os.getppid為檢視父程序名字
示例:
import time import os from multiprocessing import Process def f1(): print('子程序的pid',os.getpid()) print('子程序的父程序的pid',os.getppid()) print('aaa') def f2(): print('bbb') if __name__ == '__main__': p1 = Process(target=f1,name='寶寶1') p2 = Process(target=f2,) p1.start() p2.start() print(p1.name) print('子程序的pid',p1.pid) print('父程序的id',os.getpid()) #程序的其他方法 def f1(): time.sleep(5) print('子程序1號') if __name__ == '__main__': p = Process(target=f1,) p.start() print(p.is_alive()) #判斷子程序是否還活著,是否還在執行 p.terminate() #給作業系統傳送一個結束程序的訊號 time.sleep(0.5) print(p.is_alive())
驗證程序之間的空間是相互隔離的:
from multiprocessing import Process
num = 100
def f1():
global num
num = 3
print('子程序中的num',num)
print('>>>>>',num)
if __name__ == '__main__':
p = Process(target=f1,)
p.start()
p.join()
print('主程序中的num',num)
2.守護程序
主程序結束,守護程序也跟著結束
p.daemon:
預設值為False,如果設為True,代表p為後臺執行的守護程序,當p的父程序終止時,p也隨即終止,並且設定為True之後,p不能建立位元組的新程序,必須在p.start()之前設定
1.守護程序會在主程序程式碼執行結束後就終止
2.守護程序內再無法開啟子程序,否則丟擲異常:
AssertionError: daemonic processes are not allowed to have children
如果我們有兩個任務需要併發執行,那麼開一個主程序和一個子程序分別去執行就可以了,如果子程序的任務在主程序任務結束後就沒有存在的必要了,那麼該子程序應該在開啟前就被設定成守護程序.主程序程式碼執行結束,守護程序隨即終止
守護程序示例:
import time
from multiprocessing import Process
def f1():
time.sleep(3)
print('xxxx')
def f2():
time.sleep(5)
print('普通子程序的程式碼')
if __name__ == '__main__':
p = Process(target=f1,)
p.daemon = True #將該程序設定為守護程序,必須寫在start之前,意思如果我的主程序程式碼執行結束了,你這個子程序不管執行到什麼地方,都直接結束
p.start()
#開啟一個普通的子程序來驗證一下守護程序的結束只和主程序的程式碼執行結束有關係,而整個程式的結束需要主程序和普通的子程序的程式碼都執行結束才結束
p2 = Process(target=f2,)
p2.start()
#等待2號普通程序的結束,才繼續執行下面主程序中的程式碼
# p2.join()
#守護程序會跟跟著父程序的程式碼執行結束,就結束
print('主程序結束')
3.程序鎖/互斥鎖
程序之間資料不共享,但是共享同一套檔案系統,所以訪問同一個檔案,或同一個列印終端沒有問題.但共享帶來競爭,進而導致錯亂,這時就需要加鎖處理.互斥鎖的原理就是把病發改成序列,降低了效率,但保證了資料安全不錯亂.
互斥鎖與join()
join()將一個任務整體序列,互斥鎖的好處就是可以將一個任務中的某一段程式碼序列.
loc = Lock()
#第一種方式
def func(loc):
loc.acquire()
需要鎖的程式碼
loc.release()
#第二種方式
With loc:
需要鎖的程式碼
4.程序間的資料共享
通過引入manager,結合Lock來實現程序之間的資料共享
1.使用for迴圈來建立子程序
import time
from multiprocessing import Process
def f1():
time.sleep(0.5)
print('xxx')
if __name__ == '__main__':
p_list = []
#for迴圈建立子程序,並且完成主程序等待所有子程序執行結束,才繼續執行
for i in range(10):
p = Process(target=f1,)
p.start()
p_list.append(p)
p.join()
# for pp in p_list:
# pp.join()
print('主程序結束')
2.資料共享
為了保證程序間使用資料的安全,我們對資料處理時經常這樣:
a = 10
tmp = a
tmp -= 1
a = tmp
a -= 1 # a = a - 1
示例:
def f1(m_d,l2):
# m_d['num'] -= 1 #
with l2:
# l2.acquire()
tmp = m_d['num']
tmp -= 1
time.sleep(0.1)
m_d['num'] = tmp
# l2.release()
if __name__ == '__main__':
m = Manager()
l2 = Lock()
m_d = m.dict({'num':100})
p_list = []
for i in range(10):
p = Process(target=f1,args=(m_d,l2))
p.start()
p_list.append(p)
[pp.join() for pp in p_list]
print(m_d['num'])
4.佇列
程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:佇列和管道,這兩種訊息之間都是使用訊息傳遞的
1.建立佇列的類(底層就是管道和鎖定的方式實現)
Queue([maxsize]):建立共享的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞
2.引數介紹:
maxsize是佇列中允許的最大項數,省略則無大小限制
需要明確:
1.佇列中存放的是訊息而非大資料
2.佇列佔用的是記憶體空間,因而maxsize即便是無大小限制也受限於記憶體大小
3.主要方法介紹:
q.put()方法用來插入資料到佇列中
q.get()方法可以從佇列讀取並且刪除一個元素
q = Queue(5)
q.put() #滿了會等待
q.get() #沒有資料了會等待
q.qsize()
q.empty() 不可靠
q.full()不可靠
q.get_nowait() #不等待,但是報錯
q.put_nowait() #不等待,但是報錯
基於佇列的程序間通訊
from multiprocessing import Process,Queue
def f1(q):
q.put('約嗎?')
if __name__ == '__main__':
q = Queue(3)
p = Process(target=f1,args=(q,))
p.start()
son_p_msg = q.get()
print('來自子程序的訊息:',son_p_msg)
5.生產者消費者模型
1.程式中有兩類角色
一類負責生產資料(生產者)
一類負責處理資料(消費者)
2.引入生產者消費者模型為了解決的問題是
平衡生產者與消費者之間的速度差
程式解開耦合
3.如何實現生產者消費者模型
生產者<--->佇列<--->消費者
通過佇列實現一個生產者<--->消費者模型,基於joinnamlequeue的在下面,這裡只是呈現基本模型:
生產包子---> 吃包子
import time
from multiprocessing import Process,Queue
#生產者
def producer(q):
for i in range(10):
time.sleep(0.7)
s = '大包子%s號'%i
print(s+'新鮮出爐,拿去用')
q.put(s)
def consumer(q):
while 1:
time.sleep(1)
baozi = q.get()
print(baozi+'被吃了')
if __name__ == '__main__':
q = Queue(10)
pro_p = Process(target=producer,args=(q,))
con_p = Process(target=consumer,args=(q,))
pro_p.start()
con_p.start()
6.程序佇列:JoinableQueue([maxsize])
這就像一個Queue物件,但佇列允許專案的使用者通知生成者專案已經被成功處理.通知程序是使用共享的訊號和條件變數來實現的
maxsize是佇列中允許最大項數,省略則無大小限制
q.task_done():使用者使用此方法發出訊號,表示q.get()的返回專案已經被處理.如果呼叫此方法的次數大於從佇列中刪除專案的數量,將引發ValueError異常
q.join():生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理.阻塞將持續到佇列中的每個專案均
呼叫q.task_done()方法為止
基於JoinableQueue來實現生產者消費者模型:
import time
from multiprocessing import Process,Queue,JoinableQueue
#生產者
def producer(q):
for i in range(10):
time.sleep(0.2)
s = '大包子%s號'%i
print(s+'新鮮出爐,拿去用')
q.put(s)
q.join() #就等著task_done()訊號的數量,和我put進去的數量相同時,才繼續執行
print('所有的任務都被處理了,繼續潛行吧騷年們')
def consumer(q):
while 1:
time.sleep(0.5)
baozi = q.get()
print(baozi+'被吃了')
q.task_done() #給佇列傳送一個取出的這個任務已經處理完畢的訊號
if __name__ == '__main__':
# q = Queue(30)
q = JoinableQueue(30) #同樣是一個長度為30的佇列
pro_p = Process(target=producer,args=(q,))
con_p = Process(target=consumer,args=(q,))
pro_p.start()
con_p.daemon = True
con_p.start()
pro_p.join()
print('主程序結束')