1. 程式人生 > >python基礎-程序池、submit同異步呼叫、shutdown引數、ProcessPoolExecutor程序池、程序池ftp

python基礎-程序池、submit同異步呼叫、shutdown引數、ProcessPoolExecutor程序池、程序池ftp

引入程序池

在學習執行緒池之前,我們先看一個例子

from multiprocessing import  Process
import  time
def task(name):
    print("name",name)
    time.sleep(1)

if __name__ == "__main__":
    start = time.time()
    p1 = Process(target=task,args=("safly1",))
    p2 = Process(target=task, args=("safly2",))
    p3 = Process(target=task, args=("safly3"
,)) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join() print("main") end = time.time() print(end- start)

輸出如下:

name safly1
name safly2
name safly3
main
1.2071197032928467

以上的方式是一個個建立程序,這樣的耗費時間才1秒多,雖然高效,但是有什麼弊端呢?
如果併發很大的話,會給伺服器帶來很大的壓力,所以引入了程序池的概念

使用ProcessPoolExecutor程序池

Python3.2開始,標準庫為我們提供了concurrent.futures模組,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫執行緒池/程序池提供了直接的支援。

通過ProcessPoolExecutor 來做示例。
我們來看一個最簡單的程序池

from concurrent.futures import ProcessPoolExecutor
import  time
def task(name):
print("name",name) time.sleep(1) if __name__ == "__main__": start = time.time() ex = ProcessPoolExecutor(2) for i in range(5): ex.submit(task,"safly%d"%i) ex.shutdown(wait=True) print("main") end = time.time() print(end - start)

輸出如下:

E:\python\python_sdk\python.exe "E:/python/py_pro/4 程序池.py"
name safly0
name safly1
name safly2
name safly3
name safly4
main
3.212218999862671

簡單解釋下:
ProcessPoolExecutor(2)建立一個程序池,容量為2,迴圈submit出5個程序,然後就線上程池佇列裡面,執行多個程序,ex.shutdown(wait=True)意思是程序都執行完畢,在執行主程序的內容

使用shutdown

ex.shutdown(wait=True)是程序池內部的程序都執行完畢,才會關閉,然後執行後續程式碼
如果改成false呢?看如下程式碼

from concurrent.futures import ProcessPoolExecutor
import  time
def task(name):
    print("name",name)
    time.sleep(1)

if __name__ == "__main__":
    start = time.time()
    ex = ProcessPoolExecutor(2)

    for i in range(5):
        ex.submit(task,"safly%d"%i)
    ex.shutdown(wait=False)

    print("main")
    end = time.time()
    print(end - start)

輸出如下:

main
0.01500844955444336
name safly0
name safly1
name safly2
name safly3
name safly4

使用submit同步呼叫

同步呼叫:提交/呼叫一個任務,然後就在原地等著,等到該任務執行完畢拿到結果,再執行下一行程式碼

from concurrent.futures import ProcessPoolExecutor
import time, random, os

def piao(name, n):
    print('%s is piaoing %s' % (name, os.getpid()))
    time.sleep(1)
    return n ** 2


if __name__ == '__main__':
    p = ProcessPoolExecutor(2)
    start = time.time()
    for i in range(5):
        res=p.submit(piao,'safly %s' %i,i).result() #同步呼叫
        print(res)

    p.shutdown(wait=True)
    print('主', os.getpid())

    stop = time.time()
    print(stop - start)

輸出如下:

E:\python\python_sdk\python.exe "E:/python/py_pro/4 程序池.py"
safly 0 is piaoing 12996
0
safly 1 is piaoing 14044
1
safly 2 is piaoing 12996
4
safly 3 is piaoing 14044
9
safly 4 is piaoing 12996
1612932
5.202786684036255

Process finished with exit code 0

使用submit非同步呼叫

非同步呼叫: 提交/呼叫一個任務,不在原地等著,直接執行下一行程式碼


# from multiprocessing import Process,Pool
from concurrent.futures import ProcessPoolExecutor
import time, random, os

def piao(name, n):
    print('%s is piaoing %s' % (name, os.getpid()))
    time.sleep(1)
    return n ** 2


if __name__ == '__main__':
    p = ProcessPoolExecutor(2)
    objs = []
    start = time.time()
    for i in range(5):
        obj = p.submit(piao, 'safly %s' % i, i)  # 非同步呼叫
        objs.append(obj)

    p.shutdown(wait=True)
    print('主', os.getpid())
    for obj in objs:
        print(obj.result())

    stop = time.time()
    print(stop - start)

輸出如下:

E:\python\python_sdk\python.exe "E:/python/py_pro/4 程序池.py"
safly 0 is piaoing 1548
safly 1 is piaoing 7872

safly 2 is piaoing 1548
safly 3 is piaoing 7872


safly 4 is piaoing 15487808
0
1
4
9
16
3.202626943588257

輸出資訊的換行是我標識有輸出停頓的
簡單說下執行流程:
由於程序池容量是容納2個程序,所以會2+2+1 三次進入執行緒池執行,花費3秒

如果我們改下上面的程式碼,修改的程式碼如下:

from concurrent.futures import ProcessPoolExecutor
import time, random, os

def piao(name, n):
    print('%s is piaoing %s' % (name, os.getpid()))
    time.sleep(1)
    return n ** 2


if __name__ == '__main__':
    p = ProcessPoolExecutor(2)
    objs = []
    start = time.time()
    for i in range(5):
        obj = p.submit(piao, 'safly %s' % i, i)  # 非同步呼叫
        objs.append(obj)

    for obj in objs:
        print(obj.result())


    p.shutdown(wait=True)
    print('主', os.getpid())


    stop = time.time()
    print(stop - start)

輸出如下:(同樣我用換行,標識出輸出的時間段了)

E:\python\python_sdk\python.exe "E:/python/py_pro/4 程序池.py"
safly 0 is piaoing 7852
safly 1 is piaoing 8484


safly 2 is piaoing 7852
0
safly 3 is piaoing 8484
1



safly 4 is piaoing 7852
4
9


166816
3.178352117538452

程序池實現ftp

服務端:

from socket import *
from concurrent.futures import ProcessPoolExecutor
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    print('程序pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=ProcessPoolExecutor(5)
    while True:
        conn,client_addr=server.accept()
        p.submit(talk,conn,client_addr)

客戶端:

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))