1. 程式人生 > >python全棧開發基礎【第二十二篇】進程池和回調函數

python全棧開發基礎【第二十二篇】進程池和回調函數

enc 並發執行 exce 核數 exc 為什麽 .py bsp urn

一、數據共享

1.進程間的通信應該盡量避免共享數據的方式

2.進程間的數據是獨立的,可以借助隊列或管道實現通信,二者都是基於消息傳遞的。

雖然進程間數據獨立,但可以用過Manager實現數據共享,事實上Manager的功能遠不止於此。

命令就是一個程序,按回車就會執行(這個只是在windows情況下)
tasklist 查看進程
tasklist | findstr  pycharm   #(findstr是進行過濾的),|就是管道(tasklist執行的內容就放到管道裏面了,
管道後面的findstr  pycharm就接收了)

3.(IPC)進程之間的通信有兩種實現方式:管道和隊列

# 數據共享
from multiprocessing import Manager,Process,Lock
def work(dic,mutex):
    # mutex.acquire()
    # dic[‘count‘]-=1
    # mutex.release()
    # 也可以這樣加鎖
    with mutex:
        dic[‘count‘] -= 1
if __name__ == ‘__main__‘:
    mutex = Lock()
    m = Manager()  #實現共享,由於字典是共享的字典,所以得加個鎖
    share_dic = m.dict({‘count‘:100})
    p_l = []
    for i in range(100):
        p = Process(target=work,args=(share_dic,mutex))
        p_l.append(p)  #先添加進去
        p.start()
    for i in p_l:
        i.join()
    print(share_dic)
# 共享就意味著會有競爭,

 

二、進程池

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,並行操作可以節約大量的時間。多進程是實現並發的手段之一,需要註意的問題是:

  1. 很明顯需要並發執行的任務通常要遠大於核數
  2. 一個操作系統不可能無限開啟進程,通常有幾個核就開幾個進程
  3. 進程開啟過多,效率反而會下降(開啟進程是需要占用系統資源的,而且開啟多余核數目的進程也無法做到並行)

例如當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。

那麽什麽是進程池呢?進程池就是通過資源池的形式控制進程數目。

對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麽就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麽該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

進程池的結構:

創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然後自始至終使用這三個進程去執行所有任務,不會開啟其他進程

1.創建進程池

Pool([numprocess  [,initializer [, initargs]]]):創建進程池

2.參數介紹

numprocess:要創建的進程數,如果省略,將默認為cpu_count()的值,可os.cpu_count()查看
initializer:是每個工作進程啟動時要執行的可調用對象,默認為None
initargs:是要傳給initializer的參數組

3.方法介紹

p.apply(func [, args [, kwargs]]):在一個池工作進程中執行
func(*args,**kwargs),然後返回結果。
需要強調的是:此操作並不會在所有池工作進程中並執行func函數。
如果要通過不同參數並發地執行func函數,必須從不同線程調用p.apply()
函數或者使用p.apply_async()
 
 
p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。此方法的結果是AsyncResult類的實例,
callback是可調用對象,接收輸入參數。當func的結果變為可用時,
將理解傳遞給callback。callback禁止執行任何阻塞操作,
否則將接收其他異步操作中的結果。
    
p.close():關閉進程池,防止進一步操作。禁止往進程池內在添加任務(需要註意的是一定要寫在close()的上方)

P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之後調用

應用1:

  

# apply同步進程池(阻塞)(串行)
from multiprocessing import Pool
import os,time
def task(n):
    print(‘[%s] is running‘%os.getpid())
    time.sleep(2)
    print(‘[%s] is done‘%os.getpid())
    return n**2
if __name__ == ‘__main__‘:
    # print(os.cpu_count())  #查看cpu個數
    p = Pool(4) #最大四個進程
    for i in range(1,7):#開7個任務
        res = p.apply(task,args=(i,))  #同步的,等著一個運行完才執行另一個
        print(‘本次任務的結束:%s‘%res)
    p.close()#禁止往進程池內在添加任務
    p.join() #在等進程池
    print(‘主‘)
# apply_async異步進程池(非阻塞)(並行)
# ----------------
# 那麽我們為什麽要用進程池呢?這是因為進程池使用來控制進程數目的,
# 我們需要幾個就開幾個進程。如果不用進程池實現並發的話,會開很多的進程
# 如果你開的進程特別多,那麽你的機器就會很卡,所以我們把進程控制好,用幾個就
# 開幾個,也不會太占用內存
from multiprocessing import Pool
import os,time
def walk(n):
    print(‘task[%s] running...‘%os.getpid())
    time.sleep(3)
    return n**2
if __name__ == ‘__main__‘:
     p = Pool(4)
     res_obj_l = []
     for i in range(10):
         res = p.apply_async(walk,args=(i,))
         # print(res)  #打印出來的是對象
         res_obj_l.append(res)  #那麽現在拿到的是一個列表,怎麽得到值呢?我們用個.get方法
     p.close() #禁止往進程池裏添加任務
     p.join()
     # print(res_obj_l)
     print([obj.get() for obj in res_obj_l])  #這樣就得到了

  

同步和異步的區別

同步就是指一個進程在執行某個請求的時候,若該請求需要一段時間才能返回信息,那麽這個進程將會一直等待下去,直到收到返回信息才繼續執行下去

異步是指進程不需要一直等下去,而是繼續執行下面的操作,不管其他進程的狀態。當有消息返回時系統會通知進程進行處理,這樣可以提高執行的效率。

串行和並行區別

舉例:能並排開幾輛車的就可以說是“並行”,只能一輛一輛開的就屬於“串行”了。很明顯,並行的速度要比串行的快得多。(並行互不影響,串行的等著一個完了才能接著另一個)

應用2:

使用進程池維護固定數目的進程(以前客戶端和服務端的改進)

#服務端
from socket import *
from multiprocessing import Pool
s = socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #端口重用
s.bind((‘127.0.0.1‘,8081))
s.listen(5)
print(‘start running...‘)
def talk(coon,addr):
    while True:  # 通信循環
        try:
            cmd = coon.recv(1024)
            print(cmd.decode(‘utf-8‘))
            if not cmd: break
            coon.send(cmd.upper())
            print(‘發送的是%s‘%cmd.upper().decode(‘utf-8‘))
        except Exception:
            break
    coon.close()
if __name__ == ‘__main__‘:
    p = Pool(4)
    while True:#鏈接循環
        coon,addr = s.accept()
        print(coon,addr)
        p.apply_async(talk,args=(coon,addr))
    s.close()
#因為是循環,所以就不用p.join了
#客戶端
from socket import *
c = socket(AF_INET,SOCK_STREAM)
c.connect((‘127.0.0.1‘,8081))
while True:
    cmd = input(‘>>:‘).strip()
    if not cmd:continue
    c.send(cmd.encode(‘utf-8‘))
    data = c.recv(1024)
    print(‘接受的是%s‘%data.decode(‘utf-8‘))
c.close()

三、回調函數

回調函數什麽時候用?(回調函數在爬蟲中最常用)
造數據的非常耗時
處理數據的時候不耗時
 
你下載的地址如果完成了,就自動提醒讓主進程解析
誰要是好了就通知解析函數去解析(回調函數的強大之處)

需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

我們可以把耗時間(阻塞)的任務放到進程池中,然後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

#回調函數(下載網頁的小例子)
from  multiprocessing import Pool
import requests
import os
import time
def get_page(url):
    print(‘<%s> is getting [%s]‘ %(os.getpid(),url))
    response = requests.get(url)  #得到地址
    time.sleep(2)
    print(‘<%s> is  done [%s]‘%(os.getpid(),url))
    return {‘url‘:url,‘text‘:response.text}
def parse_page(res):
    ‘‘‘解析函數‘‘‘
    print(‘<%s> parse [%s]‘%(os.getpid(),res[‘url‘]))
    with open(‘db.txt‘,‘a‘) as f:
        parse_res = ‘url:%s size:%s\n‘ %(res[‘url‘],len(res[‘text‘]))
        f.write(parse_res)
if __name__ == ‘__main__‘:
    p = Pool(4)
    urls = [
        ‘https://www.baidu.com‘,
        ‘http://www.openstack.org‘,
        ‘https://www.python.org‘,
        ‘https://help.github.com/‘,
        ‘http://www.sina.com.cn/‘
    ]
    for url in urls:
        obj = p.apply_async(get_page,args=(url,),callback=parse_page)
    p.close()
    p.join()
    print(‘主‘,os.getpid())  #都不用.get()方法了

如果在主進程中等待進程池中所有任務都執行完畢後,再統一處理結果,則無需回調函數

#下載網頁小例子(無需回調函數)
from  multiprocessing import Pool
import requests
import os
def get_page(url):
    print(‘<%os> get [%s]‘ %(os.getpid(),url))
    response = requests.get(url)  #得到地址  response響應
    return {‘url‘:url,‘text‘:response.text}
if __name__ == ‘__main__‘:
    p = Pool(4)
    urls = [
        ‘https://www.baidu.com‘,
        ‘http://www.openstack.org‘,
        ‘https://www.python.org‘,
        ‘https://help.github.com/‘,
        ‘http://www.sina.com.cn/‘
    ]
    obj_l= []
    for url in urls:
        obj = p.apply_async(get_page,args=(url,))
        obj_l.append(obj)
    p.close()
    p.join()
    print([obj.get() for obj in obj_l])

  

python全棧開發基礎【第二十二篇】進程池和回調函數