1. 程式人生 > ># 如何使用Python3.5並行執行多個web請求(不適用aiohttp

# 如何使用Python3.5並行執行多個web請求(不適用aiohttp

> 作者的生產環境剛從2.6升級到3.5.0,但滿足不了aiohttp的最低版本需求。所以在有了這篇文章,如何改造程式碼,充分利用python3.5 asyncio提供的非同步功能。原文連結


近日IT部門最終將我們工作環境的分散式Python版本升級到了3.5.0。這對從2.6版本來說是一次巨大的升級,但依然有些遺憾。3.5.0 不能滿足一些庫的最小版本需求,這其中就包括aiohttp。

儘管有這些限制,我依然需要寫指令碼從我們的API獲取數以百計的csv檔案,然後處理資料。Python本身並不想NodeJS那樣基於事件驅動和原生非同步,但這並不妨礙Python 也能實現一樣的功能。這篇文件將詳細介紹我如何學習非同步操作,並列出它的優勢。

宣告: 如果你有更高的版本(3.5.2+),強烈推薦你使用aiohttp。這是個非常健壯的庫, 特別適合解決這類問題。網上也有很多關於她的教程。

假設

作如下假設:

> * 熟悉Python和它的語法 > * 熟悉基礎的網路請求 > * 知道非同步執行的概念

開始

安裝requests

$ python -m pip install requests

沒有許可權可以做如下安裝

$ python -m pip install requests --user

錯誤的做法:同步請求

為了體現並行的好處,先看看同步的做法。我大概描述一下程式碼將要做什麼。我們要執行一個能獲取csv檔案的GET請求,測量讀取其中文字的時間。

我們將從這個網址(https://people.sc.fsu.edu/~jburkardt/data/csv/)下載多個csv檔案, 裡面有很多例項資料。

在說明一下,我們將用requests 庫裡 Session物件,執行GET請求。

首先,需要一個方法執行web請求:

def fetch(session, csv):
    base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
    with session.get(base_url + csv) as response:
        data = response.text
        if response.status_code != 200:
            print("FAILURE::{0}".format(url))
        # Return .csv data for future consumption
        return data

這個函式使用Session物件和csv名字,執行網路請求,然後返回response裡的文字內容。

下面,我們需要一個函式遍歷檔案列表,然後去請求,統計執行請求的時間。

from timeit import default_timer()

def get_data_synchronous():
    csvs_to_fetch = [
        "ford_escort.csv",
        "cities.csv",
        "hw_25000.csv",
        "mlb_teams_2012.csv",
        "nile.csv",
        "homes.csv",
        "hooke.csv",
        "lead_shot.csv",
        "news_decline.csv",
        "snakes_count_10000.csv",
        "trees.csv",
        "zillow.csv"
    ]

    with requests.Session() as session:
        print("{0:<30} {1:>20}".format("File", "Completed at"))
        
        # Set any session parameters here before calling `fetch`
        # For instance, if you needed to set Headers or Authentication
        # this can be done before starting the loop
        
        total_start_time = default_timer()
        for csv in csvs_to_fetch:
            fetch(session, csv)
            elapsed = default_timer() - total_start_time
            time_completed_at = "{:5.2f}s".format(elapsed)
            print("{0:<30} {1:>20}".format(csv, time_completed_at))

這個函式建立了一個Session物件,然後遍歷csvs_to_fetch裡的每個檔案。一旦fetch操作結束, 就將計算下載時間,並以易讀的格式展示。

最後main函式呼叫:

def main():
    # Simple for now
    get_data_synchronous()

main()

同步執行的完整程式碼


import requests
from timeit import default_timer

def fetch(session, csv):
    base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
    with session.get(base_url + csv) as response:
        data = response.text
        if response.status_code != 200:
            print("FAILURE::{0}".format(url))
        # Return .csv data for future consumption
        return data

def get_data_synchronous():
    csvs_to_fetch = [
        "ford_escort.csv",
        "cities.csv",
        "hw_25000.csv",
        "mlb_teams_2012.csv",
        "nile.csv",
        "homes.csv",
        "hooke.csv",
        "lead_shot.csv",
        "news_decline.csv",
        "snakes_count_10000.csv",
        "trees.csv",
        "zillow.csv"
    ]

    with requests.Session() as session:
        print("{0:<30} {1:>20}".format("File", "Completed at"))
        
        # Set any session parameters here before calling `fetch`
        # For instance, if you needed to set Headers or Authentication
        # this can be done before starting the loop
        
        total_start_time = default_timer()
        for csv in csvs_to_fetch:
            fetch(session, csv)
            elapsed = default_timer() - total_start_time
            time_completed_at = "{:5.2f}s".format(elapsed)
            print("{0:<30} {1:>20}".format(csv, time_completed_at))

def main():
    # Simple for now
    get_data_synchronous()

main()

結果:

同步程式碼.注意觀察一個完成後,才能執行下一個

多虧了Python3 asyncio, 通過它我們可以大幅度提高效能。

正確的解決辦法: 一次執行多個非同步請求

為了能起作用,我們要先重做現有的程式碼。從fetch開始:

import requests
from timeit import default_timer

# We'll need access to this variable later
START_TIME = default_timer()

def fetch(session, csv):
    base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
    with session.get(base_url + csv) as response:
        data = response.text
        if response.status_code != 200:
            print("FAILURE::{0}".format(url))
        # Now we will print how long it took to complete the operation from the 
        # `fetch` function itself
        elapsed = default_timer() - START_TIME
        time_completed_at = "{:5.2f}s".format(elapsed)
        print("{0:<30} {1:>20}".format(csv, time_completed_at))

        return data

下一步, 改造get_data為非同步函式

import asyncio
from timeit import default_timer
from concurrent.futures import ThreadPoolExecutor

async def get_data_asynchronous():
    csvs_to_fetch = [
        "ford_escort.csv",
        "cities.csv",
        "hw_25000.csv",
        "mlb_teams_2012.csv",
        "nile.csv",
        "homes.csv",
        "hooke.csv",
        "lead_shot.csv",
        "news_decline.csv",
        "snakes_count_10000.csv",
        "trees.csv",
        "zillow.csv"
    ]
    print("{0:<30} {1:>20}".format("File", "Completed at"))
    
    # Note: max_workers is set to 10 simply for this example,
    # you'll have to tweak with this number for your own projects
    # as you see fit
    with ThreadPoolExecutor(max_workers=10) as executor:
        with requests.Session() as session:
            # Set any session parameters here before calling `fetch`

            # Initialize the event loop        
            loop = asyncio.get_event_loop()
            
            # Set the START_TIME for the `fetch` function
            START_TIME = default_timer()
            
            # Use list comprehension to create a list of
            # tasks to complete. The executor will run the `fetch`
            # function for each csv in the csvs_to_fetch list
            tasks = [
                loop.run_in_executor(
                    executor,
                    fetch,
                    *(session, csv) # Allows us to pass in multiple arguments to `fetch`
                )
                for csv in csvs_to_fetch
            ]
            
            # Initializes the tasks to run and awaits their results
            for response in await asyncio.gather(*tasks):
                pass

現在的程式碼建立了多個執行緒,為每個csv檔案執行fetch函式進行下載。

最後,我們的mian函式為了正確的初始化非同步函式,也需要稍微做些修改。

def main():
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(get_data_asynchronous())
    loop.run_until_complete(future)

main()

再執行下,看看結果:

非同步例子。注意獲取檔案並不是按順序的。

略微修改後,12個檔案的下載時間3.43s vs 10.84s。下載時間減少了近70%。

import requests
import asyncio
from concurrent.futures import ThreadPoolExecutor
from timeit import default_timer

START_TIME = default_timer()

def fetch(session, csv):
    base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
    with session.get(base_url + csv) as response:
        data = response.text
        if response.status_code != 200:
            print("FAILURE::{0}".format(url))

        elapsed = default_timer() - START_TIME
        time_completed_at = "{:5.2f}s".format(elapsed)
        print("{0:<30} {1:>20}".format(csv, time_completed_at))

        return data

async def get_data_asynchronous():
    csvs_to_fetch = [
        "ford_escort.csv",
        "cities.csv",
        "hw_25000.csv",
        "mlb_teams_2012.csv",
        "nile.csv",
        "homes.csv",
        "hooke.csv",
        "lead_shot.csv",
        "news_decline.csv",
        "snakes_count_10000.csv",
        "trees.csv",
        "zillow.csv"
    ]
    print("{0:<30} {1:>20}".format("File", "Completed at"))
    with ThreadPoolExecutor(max_workers=10) as executor:
        with requests.Session() as session:
            # Set any session parameters here before calling `fetch`
            loop = asyncio.get_event_loop()
            START_TIME = default_timer()
            tasks = [
                loop.run_in_executor(
                    executor,
                    fetch,
                    *(session, csv) # Allows us to pass in multiple arguments to `fetch`
                )
                for csv in csvs_to_fetch
            ]
            for response in await asyncio.gather(*tasks):
                pass

def main():
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(get_data_asynchronous())
    loop.run_until_complete(future)

main()

希望你喜歡這篇文章,並將這些技術應用到必須使用舊版本Python的專案。 儘管Python沒有簡單的async / await 模式,但要取得類似的結