重修課程day32(網絡編程六之進程三)
什麽叫做水平擴展:增加計算機的數量,並沒有提高計算機的性能
什麽叫開源:開放源代碼
什麽叫做虛擬化:同時跑多個系統
一 JoinableQueue模塊
JoinableQueue模塊介紹:比Queue多了兩個函數,一個是task_done,另一個是join,都是專用於全球進行編程的,大多數用於生產者和消費者之間的。
task_done:是用在get後面的 ,告訴os已經處理完了內容。
join:是說Queue裏面的生產數據全部處理完了。
使用方法如下:
# import multiprocessing # import time # import random # def sheng(name,q,wuping): # for i in range(5): # time.sleep(random.random()) # ret=‘%s%s‘%(wuping,i) # q.put(ret) # print(‘廚師%s創建了%s‘%(name,ret)) # q.join() # def xiao(name,q): # while True: # time.sleep(random.random()) # ret=q.get() # print(‘%s吃了%s‘%(name,ret)) # q.task_done() # if __name__==‘__main__‘: # q=multiprocessing.JoinableQueue() # s1=multiprocessing.Process(target=sheng,args=(‘egon1‘,q,‘包子‘)) # s2=multiprocessing.Process(target=sheng,args=(‘egon2‘,q,‘rou‘)) # s3=multiprocessing.Process(target=sheng,args=(‘egon3‘,q,‘骨頭‘)) # x1=multiprocessing.Process(target=xiao,args=(‘alex1‘,q)) # x2=multiprocessing.Process(target=xiao,args=(‘alex2‘,q)) # x1.daemon=True # x2.daemon=True # s1.start() # s2.start() # s3.start() # x1.start() # x2.start() # s1.join() # s2.join() # s3.join()
二 Manager:共享內存空間
dict:共享的以惡搞字典,能夠被多個共享
如下1:
# import multiprocessing # def walk(d): # d[‘count‘]-=1 # if __name__==‘__main__‘: # m=multiprocessing.Manager() # d=m.dict({‘count‘:100}) # for i in range(100): # w=multiprocessing.Process(target=walk,args=(d,)) # w.start() # w.join() # print(d)
如下2:
# import multiprocessing # def walk(d,loak): # with loak: # d[‘count‘]-=1 # if __name__==‘__main__‘: # m=multiprocessing.Manager() # d=m.dict({‘count‘:100}) # lock=multiprocessing.Lock() # li=[] # for i in range(100): # w=multiprocessing.Process(target=walk,args=(d,lock)) # li.append(w) # w.start() # for j in li: # j.join() # print(j) # print(d)
三 進程池
什麽是進程池:創建一定數量的進程個數
同步和異步:提交任務的兩種方式。
Pool:創建進程池和控制進程的數目,默認的個數是根據CPU的核數
apply:傳入兩個參數,第一個是指定任務。向進程池提交一個任務,實現了串行和同步調用。結束任務後,立馬會拿到結果。
開啟的進程數目有幾個,就會有幾個pid。
什麽是同步調用:提交一個任務,等到任務結束後才能執行下一個任務。
# import multiprocessing # import time # import random # import os # def walk(n): # print(‘%s is walking‘%os.getpid()) # time.sleep(random.random()) # return n # # if __name__==‘__main__‘: # p=multiprocessing.Pool(4) # for i in range(10): # q=p.apply(walk,args=(i,)) # print(q)
apply_async:向進程池提交任務,提交完任務後就不管了,只管提交任務,不能直接執行任務。實現了一個並發和惡異步調用
執行方法:先close:關閉任務,好讓任務結束
然後join:等待一個進程池不在提交任務,並且任務結束和計算任務個數
最後get:獲取返回值
什麽是異步調用:提交完一個任務過後不會在原地等待,而是繼續提交下一個任務。等待所有的任務結束後在用get獲取任務
如下:
# import multiprocessing # import time # import random # import os # def walk(n): # print(‘%s is walking‘%os.getpid()) # time.sleep(random.random()) # return n # # if __name__==‘__main__‘: # p=multiprocessing.Pool(4) # li=[] # for i in range(10): # q=p.apply_async(walk,args=(i,)) # li.append(q) # p.close() # p.join() # for i in li: # print(i.get()) #
四 回調函數
什麽是回調函數:通過一個函數內存調用的函數,如果將這個函數的地址當作參數傳給另一個函數,當這個函數的地址用來調用其所只想的函數是,這個所指向的函數就是回調函數。詳情訪問:http://www.cnblogs.com/hainan-zhang/p/6222552.html
callback:後面加上的是回調函數
回調函數的進程其實就是主進程。
用回調函數實現一個網絡爬蟲;需要用到requests模塊
get:獲取網址
status_code:返回狀態碼
text:查看下載網址的內容
from multiprocessing import Pool,Process import requests import os import time,random def get(url): print(‘%s GET %s‘ %(os.getpid(),url)) response=requests.get(url) time.sleep(random.randint(1,3)) if response.status_code == 200: print(‘%s DONE %s‘ % (os.getpid(), url)) return {‘url‘:url,‘text‘:response.text} def parse(dic): print(‘%s PARSE %s‘ %(os.getpid(),dic[‘url‘])) time.sleep(1) res=‘%s:%s\n‘ %(dic[‘url‘],len(dic[‘text‘])) with open(‘db.txt‘,‘a‘) as f: f.write(res) if __name__ == ‘__main__‘: urls=[ ‘https://www.baidu.com‘, ‘https://www.python.org‘, ‘https://www.openstack.org‘, ‘https://help.github.com/‘, ‘http://www.sina.com.cn/‘ ] p=Pool(2) start_time=time.time() objs=[] for url in urls: obj=p.apply_async(get,args=(url,),callback=parse) #主進程負責幹回調函數的活 objs.append(obj) p.close() p.join() print(‘主‘,(time.time()-start_time))
重修課程day32(網絡編程六之進程三)