1. 程式人生 > >程序(六):程序池(Pool)

程序(六):程序池(Pool)

目錄

程序池

multiprocess.Pool模組

概念介紹

引數介紹

主要方法

其他方法(瞭解)

程式碼例項

回撥函式


程序池

為什麼要有程序池?程序池的概念。

在程式實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閒時可能只有零星任務。那麼在成千上萬個任務需要被執行的時候,我們就需要去建立成千上萬個程序麼?首先,建立程序需要消耗時間,銷燬程序也需要消耗時間。第二即便開啟了成千上萬的程序,作業系統也不能讓他們同時執行,這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序。那麼我們要怎麼做呢?

在這裡,要給大家介紹一個程序池的概念,定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實現併發效果。

multiprocess.Pool模組

概念介紹

Pool([numprocess  [,initializer [, initargs]]]):建立程序池

引數介紹

1 numprocess:要建立的程序數,如果省略,將預設使用cpu_count()的值

2 initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為None

3 initargs:是要傳給initializer(可迭代)的引數組

主要方法

1、p.apply(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
'''需要強調的是:此操作並不會在所有池工作程序中並執行func函式。如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p.apply()函式或者使用p.apply_async()'''

2、p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
'''此方法的結果是AsyncResult類的例項,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。'''
   
3、p.close():關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成

4、P.jion():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫

其他方法(瞭解)

方法apply_async()和map_async()的返回值是AsyncResul的例項obj。例項具有以下方法
●obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠端操作中引發了異常,它將在呼叫此方法時再次被引發。
●obj.ready():如果呼叫完成,返回True
●obj.successful():如果呼叫完成且沒有引發異常,返回True,如果在結果就緒之前呼叫此方法,引發異常
●obj.wait([timeout]):等待結果變為可用。
●obj.terminate():立即終止所有工作程序,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動呼叫此函式

程式碼例項

程序池和多程序效率對比

import time
from multiprocessing import Pool, Process

def func(n):
    for i in range(10):
        print(n + 1)

if __name__ == '__main__':
    start = time.time()
    pool = Pool(5)  # 5個程序池
    pool.map(func, range(100))  # 100個任務
    t1 = time.time() - start

    start = time.time()
    p_lst = []
    for i in range(100):
        p = Process(target=func, args=(i,))   #100個程序的任務
        p_lst.append(p)
        p.start()
    for p in p_lst: p.join()
    t2 = time.time() - start
    print(t1, t2)

同步和非同步

#程序池的同步呼叫
import os,time
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(3)
    return n**2

if __name__ == '__main__':
    p=Pool(3) #程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply(work,args=(i,)) # 同步呼叫,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞
                                    # 但不管該任務是否存在阻塞,同步呼叫都會在原地等著
    print(res_l)




#程序池的非同步呼叫
import os
import time
import random
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(random.random())
    return n**2

if __name__ == '__main__':
    p=Pool(3) #程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,)) # 非同步執行,根據程序池中有的程序數,每次最多3個子程序在非同步執行
                                          # 返回結果之後,將結果放入列表,歸還程序,之後再執行新的任務
                                          # 需要注意的是,程序池中的三個程序不會同時開啟或者同時結束
                                          # 而是執行完一個就釋放一個程序,這個程序就去接收新的任務。  
        res_l.append(res)

    # 非同步apply_async用法:如果使用非同步提交的任務,主程序需要使用jion,等待程序池內任務都處理完,然後可以用get收集結果
    # 否則,主程序結束,程序池可能還沒來得及執行,也就跟著一起結束了
    p.close()
    p.join()
    for res in res_l:
        print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get

練習

#server:程序池版socket併發聊天

#Pool內的程序數預設是cpu核數,假設為4(檢視方法os.cpu_count())
#開啟6個客戶端,會發現2個客戶端處於等待狀態
#在每個程序內檢視pid,會發現pid使用為4個,即多個客戶端公用4個程序
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn):
    print('程序pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool(4)
    while True:
        conn,*_=server.accept()
        p.apply_async(talk,args=(conn,))
        # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問



#client
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

發現:併發開啟多個客戶端,服務端同一時間只有4個不同的pid,只能結束一個客戶端,另外一個客戶端才會進來.

回撥函式

需要回調函式的場景:程序池中任何一個任務一旦處理完了,就立即告知主程序:我好了額,你可以處理我的結果了。主程序則呼叫一個函式去處理該結果,該函式即回撥函式

我們可以把耗時間(阻塞)的任務放到程序池中,然後指定回撥函式(主程序負責執行),這樣主程序在執行回撥函式時就省去了I/O的過程,直接拿到的是任務的結果。

#使用多程序請求多個url來減少網路等待浪費的時間
import requests
from urllib.request import urlopen
from multiprocessing import Pool


# 200 網頁正常的返回
# 404 網頁找不到
# 502 504
def get(url):
    response = requests.get(url)
    if response.status_code == 200:
        return url, response.content.decode('utf-8')


def get_urllib(url):
    ret = urlopen(url)
    return ret.read().decode('utf-8')


def call_back(args):
    url, content = args
    print(url, len(content))


if __name__ == '__main__':
    url_lst = [
        'https://www.cnblogs.com/',
        'http://www.baidu.com',
        'https://www.sogou.com/',
        'http://www.sohu.com/',
    ]
    p = Pool(5)
    for url in url_lst:
        p.apply_async(get, args=(url,), callback=call_back)
    p.close()
    p.join()

爬蟲例項

import re
from urllib.request import urlopen
from multiprocessing import Pool

def get_page(url,pattern):
    response=urlopen(url).read().decode('utf-8')
    return pattern,response

def parse_page(info):
    pattern,page_content=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0].strip(),
            'title':item[1].strip(),
            'actor':item[2].strip(),
            'time':item[3].strip(),
        }
        print(dic)
if __name__ == '__main__':
    regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
    pattern1=re.compile(regex,re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

如果在主程序中等待程序池中所有任務都執行完畢後,再統一處理結果,則無需回撥函式

#無需回撥函式
from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待程序池中所有程序執行完畢

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到所有結果
    print(nums) #主程序拿到所有的處理結果,可以在主程序中進行統一進行處理