python全棧開發day33-進程間的通信、進程間的數據共享,進程池
一、昨日內容回顧:
1. 守護進程
1)、p.saemon,
2 )、p.terminate
3 )、p.join
2. 同步控制
1)、鎖,Lock
互斥鎖,解決數據安全、進程之間資源搶占問題。
2)、信號量,Semaphore
鎖+計數器
3)、事件,Event
通過一個標誌位flag來控制進程的阻塞和執行。
3. 多進程實現tcp協議的socket的sever端
1)子進程中不能使用input
2)允許端口的重用設置
3)妥善處理sk的close確保操作系統的資源能夠被及時回收。
import socket from multiprocessing import Process def func(conn): conn.send(b‘hello‘) data = conn.recv(1024) print(data.decode(‘utf-8‘)) conn.close() if __name__ == ‘__main__‘: sk = socket.socket() sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,server1) sk.bind((‘127.0.0.1‘, 9000)) sk.listen(5) try: while True: con, addr = sk.accept() p = Process(target=func, args=(con,)) p.start() finally: sk.close()
import socket sk = socket.socket() sk.connect((‘127.0.0.1‘, 9000)) data = sk.recv(1024)clientprint(data) msg = input(‘>>>‘).encode(‘utf-8‘) sk.send(msg)
二、今日內容總結:
1、進程間的通信:
1)、隊列 Queue:隊列是加鎖的,在多進程之間對數據的管理是安全的
維護了一個先進先出的順序,且保證了數據在進程之間是安全的。
put,get,full,empty,get_nowait,put_nowait
生產者和消費者模型:
(1)、解決生產消費供需關系,生產的東西不夠吃,就再開啟一個進程生產。。。
(2)、解決消費者不能結束消費完物品的循環和阻塞問題,隊列中引入None,
讓消費者再取的時候判斷是否遇到None,遇到則結束。有幾個消費者就隊列中就put幾個None
(3)、解決生產者生產完成後主程序才結束問題,對生產者的進程進程join阻塞
JoinableQueue:
join和task_done方法:
join會阻塞隊列,直至隊列中的數據被取完,且執行了一個task_done,程序才會繼續執行。
from multiprocessing import Process, Queue import time, random def consumer(name, q): while True: time.sleep(random.randint(1, 3)) food = q.get() if food is None: break print(‘%s吃了%s‘ % (name, food)) def producer(name, food, q): for i in range(10): time.sleep(random.randint(1, 5)) q.put(‘%s生產了%s%s‘ % (name, food, i)) print(‘%s生產了數據%s%s‘ % (name, food, i)) if __name__ == ‘__main__‘: q = Queue() p1 = Process(target=producer, args=(‘egon‘, ‘面包‘, q)) p2 = Process(target=producer, args=(‘taibai‘, ‘骨頭‘, q)) p1.start() p2.start() c1 = Process(target=consumer, args=(‘alex‘, q)) c2 = Process(target=consumer, args=(‘firedragon‘, q)) c1.start() c2.start() p1.join() p2.join() q.put(None) q.put(None)生產者和消費者模型例子None
from multiprocessing import JoinableQueue,Process import time def consumer(name,q): while True: obj = q.get() time.sleep(0.3) print(‘%s吃了一個%s‘ % (name, obj)) q.task_done() if __name__ == ‘__main__‘: q = JoinableQueue() for i in range(10): q.put(‘food%s‘ % i) p1 = Process(target=consumer, args=(‘alex‘, q)) p1.daemon = True p1.start() q.join() # 阻塞隊列,直至隊列的數據被取完,且執行了一個task_done() # p1為守護進程,主程序代碼執行完畢後,守護進程隨之結束,裏邊的循環自然也結束了。生產者和消費者模型JoinableQueue
2)、管道 Pipe:底層實現是pickle,對數據的管理是不安全的,隊列的實現機制就是管道+鎖
雙向通信:利用pickle實現的
收不到,就阻塞
# 管道的EOFError是怎麽報出來的(同時關閉主進程的lp和子進程的lp就會報出EOFError)
# 管道在數據管理上是不安全的
# 隊列的實現機制 就是 管道+鎖
from multiprocessing import Pipe,Process # lp,rp = Pipe() # lp.send(‘hello‘) # print(rp.recv()) # # print(rp.recv()) # 沒有數據在此阻塞進程 # # rp.send() # 不能發送空數據 # lp.send([1,2,3]) # print(rp.recv()) def consumer(lp,rp): lp.close() #1.這個發關閉 while True: print(rp.recv()) if __name__ == ‘__main__‘: lp, rp = Pipe() Process(target=consumer, args=(lp, rp)).start() Process(target=consumer, args=(lp, rp)).start() Process(target=consumer, args=(lp, rp)).start() Process(target=consumer, args=(lp, rp)).start() #rp.close() for i in range(100): lp.send(‘food%i‘ % i) lp.close() #2.這個關閉 這兩關閉才會報錯EOFError管道例子
2、進程之間的數據共享,Manager
Manager創建的數據(如字典等)可以在進程之間共享,涉及數據操作要加上鎖,不然會出現數據錯亂。
m = Manager() dic = m.dict({‘count’:100})
with Manager() as m: dic = m.dict({‘count’:100}) 但涉及dic的操作代碼必須在with的縮進執行
from multiprocessing import Lock,Manager,Process def func(dic_tmp, lock_tmp): with lock_tmp: dic_tmp[‘count‘] -= 1 if __name__ == ‘__main__‘: lock = Lock() with Manager() as m: dic = m.dict({‘count‘: 50}) p_lst = [] for i in range(50): p = Process(target=func, args=(dic, lock)) p.start() p_lst.append(p) for i in p_lst: i.join() print(dic)50個進程同時操作一個字典的例子
3、進程池
進程池使用場景PK多進程:
1.對於純計算的代碼,使用進程池更好(個人理解,高效利用cpu沒有了節省了進程的開啟和回收時間,也節省操作系統調度進程切換的時間)
2.對於高IO的代碼,沒有更好選擇的情況下使用多進程。
總結:使用進程池比起多進程,節省了開啟進程回收進程資源的時間,給操作系統調度進程降低了難度。
進程池apply(同步)添加入池方法和apply_async(異步)
使用進程池提交任務方法:
p = Pool(5)
p.appy(func=***,args=(,))) #同步提交任務 沒有多進程的優勢
p.apply_async(func=***,args=(,)) #異步提交任務
p.close() #關閉進程池,阻止向進程池添加新的任務
p.join() #依賴close,進程池必須先close後join(個人理解應該是要阻塞執行完進程池的任務,才進入非阻塞狀態)
------代碼-------
from multiprocessing import Pool,Process import time,random def wahaha(num): time.sleep(random.randint(1,3)) print(‘num:%s‘ % num**num) if __name__ == ‘__main__‘: # -------------------------適合高計算-------------------------------------------- p = Pool(5) # start = time.time() # for i in range(100): # p.apply_async(func=wahaha,args=(i,)) # # p.close() # p.join() # print(time.time()-start) start = time.time() p.map(func=wahaha,iterable=range(101)) print(time.time()-start) # -------------------------適合高IO-------------------------------------------- # start = time.time() # p_lst = [] # for i in range(101): # p = Process(target=wahaha,args=(i,)) # p.start() # p_lst.append(p) # for i in p_lst: # i.join() # print(time.time() - start)View Code
使用map添加任務的方法以及它和普通(apply_async)方法的區別:
p.map(func=***,iterable=range(101))
優點:就是一個任務函數,個一個itetable,節省了for循環和close,join,是一種簡便寫法。
區別:apply_async和map相比,操作復雜,但是可以通過get方法獲取返回值,而map不行。
def wahaha(num): print(num) return num*‘*‘ if __name__ == ‘__main__‘: p = Pool(5) start = time.time() result_lst = [] for i in range(100): res = p.apply_async(func=wahaha,args=(i,)) result_lst.append(res) print(result_lst) for j in result_lst:print(j.get()) p.close() p.join() print(time.time()-start)apply_async使用get獲取返回值
回調函數:可以接收func函數的返回值。但callback函數在主進程中運行。
p.apply_async(func=***,args=(*,),callback=回調函數))
from multiprocessing import Pool,Process import os def wahaha(num): print(‘子進程:‘,os.getpid()) return num**num def callb(argv): print(os.getpid()) print(argv) if __name__ == ‘__main__‘: print(‘主進程‘, os.getpid()) p = Pool(5) p.apply_async(func=wahaha,args=(1,),callback=callb) p.close() p.join()callback
三、預習和擴展:
python全棧開發day33-進程間的通信、進程間的數據共享,進程池