# 如何使用Python3.5並行執行多個web請求(不適用aiohttp
> 作者的生產環境剛從2.6升級到3.5.0,但滿足不了aiohttp的最低版本需求。所以在有了這篇文章,如何改造程式碼,充分利用python3.5 asyncio提供的非同步功能。原文連結
近日IT部門最終將我們工作環境的分散式Python版本升級到了3.5.0。這對從2.6版本來說是一次巨大的升級,但依然有些遺憾。3.5.0 不能滿足一些庫的最小版本需求,這其中就包括aiohttp。
儘管有這些限制,我依然需要寫指令碼從我們的API獲取數以百計的csv檔案,然後處理資料。Python本身並不想NodeJS那樣基於事件驅動和原生非同步,但這並不妨礙Python 也能實現一樣的功能。這篇文件將詳細介紹我如何學習非同步操作,並列出它的優勢。
宣告: 如果你有更高的版本(3.5.2+),強烈推薦你使用aiohttp。這是個非常健壯的庫, 特別適合解決這類問題。網上也有很多關於她的教程。
假設
作如下假設:
> * 熟悉Python和它的語法 > * 熟悉基礎的網路請求 > * 知道非同步執行的概念
開始
安裝requests
$ python -m pip install requests
沒有許可權可以做如下安裝
$ python -m pip install requests --user
錯誤的做法:同步請求
為了體現並行的好處,先看看同步的做法。我大概描述一下程式碼將要做什麼。我們要執行一個能獲取csv檔案的GET請求,測量讀取其中文字的時間。
我們將從這個網址(https://people.sc.fsu.edu/~jburkardt/data/csv/)下載多個csv檔案, 裡面有很多例項資料。
在說明一下,我們將用requests 庫裡 Session物件,執行GET請求。
首先,需要一個方法執行web請求:
def fetch(session, csv): base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/" with session.get(base_url + csv) as response: data = response.text if response.status_code != 200: print("FAILURE::{0}".format(url)) # Return .csv data for future consumption return data
這個函式使用Session物件和csv名字,執行網路請求,然後返回response裡的文字內容。
下面,我們需要一個函式遍歷檔案列表,然後去請求,統計執行請求的時間。
from timeit import default_timer()
def get_data_synchronous():
csvs_to_fetch = [
"ford_escort.csv",
"cities.csv",
"hw_25000.csv",
"mlb_teams_2012.csv",
"nile.csv",
"homes.csv",
"hooke.csv",
"lead_shot.csv",
"news_decline.csv",
"snakes_count_10000.csv",
"trees.csv",
"zillow.csv"
]
with requests.Session() as session:
print("{0:<30} {1:>20}".format("File", "Completed at"))
# Set any session parameters here before calling `fetch`
# For instance, if you needed to set Headers or Authentication
# this can be done before starting the loop
total_start_time = default_timer()
for csv in csvs_to_fetch:
fetch(session, csv)
elapsed = default_timer() - total_start_time
time_completed_at = "{:5.2f}s".format(elapsed)
print("{0:<30} {1:>20}".format(csv, time_completed_at))
這個函式建立了一個Session物件,然後遍歷csvs_to_fetch裡的每個檔案。一旦fetch操作結束, 就將計算下載時間,並以易讀的格式展示。
最後main函式呼叫:
def main():
# Simple for now
get_data_synchronous()
main()
同步執行的完整程式碼
import requests
from timeit import default_timer
def fetch(session, csv):
base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
with session.get(base_url + csv) as response:
data = response.text
if response.status_code != 200:
print("FAILURE::{0}".format(url))
# Return .csv data for future consumption
return data
def get_data_synchronous():
csvs_to_fetch = [
"ford_escort.csv",
"cities.csv",
"hw_25000.csv",
"mlb_teams_2012.csv",
"nile.csv",
"homes.csv",
"hooke.csv",
"lead_shot.csv",
"news_decline.csv",
"snakes_count_10000.csv",
"trees.csv",
"zillow.csv"
]
with requests.Session() as session:
print("{0:<30} {1:>20}".format("File", "Completed at"))
# Set any session parameters here before calling `fetch`
# For instance, if you needed to set Headers or Authentication
# this can be done before starting the loop
total_start_time = default_timer()
for csv in csvs_to_fetch:
fetch(session, csv)
elapsed = default_timer() - total_start_time
time_completed_at = "{:5.2f}s".format(elapsed)
print("{0:<30} {1:>20}".format(csv, time_completed_at))
def main():
# Simple for now
get_data_synchronous()
main()
結果:
多虧了Python3 asyncio, 通過它我們可以大幅度提高效能。
正確的解決辦法: 一次執行多個非同步請求
為了能起作用,我們要先重做現有的程式碼。從fetch開始:
import requests
from timeit import default_timer
# We'll need access to this variable later
START_TIME = default_timer()
def fetch(session, csv):
base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
with session.get(base_url + csv) as response:
data = response.text
if response.status_code != 200:
print("FAILURE::{0}".format(url))
# Now we will print how long it took to complete the operation from the
# `fetch` function itself
elapsed = default_timer() - START_TIME
time_completed_at = "{:5.2f}s".format(elapsed)
print("{0:<30} {1:>20}".format(csv, time_completed_at))
return data
下一步, 改造get_data為非同步函式
import asyncio
from timeit import default_timer
from concurrent.futures import ThreadPoolExecutor
async def get_data_asynchronous():
csvs_to_fetch = [
"ford_escort.csv",
"cities.csv",
"hw_25000.csv",
"mlb_teams_2012.csv",
"nile.csv",
"homes.csv",
"hooke.csv",
"lead_shot.csv",
"news_decline.csv",
"snakes_count_10000.csv",
"trees.csv",
"zillow.csv"
]
print("{0:<30} {1:>20}".format("File", "Completed at"))
# Note: max_workers is set to 10 simply for this example,
# you'll have to tweak with this number for your own projects
# as you see fit
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
# Set any session parameters here before calling `fetch`
# Initialize the event loop
loop = asyncio.get_event_loop()
# Set the START_TIME for the `fetch` function
START_TIME = default_timer()
# Use list comprehension to create a list of
# tasks to complete. The executor will run the `fetch`
# function for each csv in the csvs_to_fetch list
tasks = [
loop.run_in_executor(
executor,
fetch,
*(session, csv) # Allows us to pass in multiple arguments to `fetch`
)
for csv in csvs_to_fetch
]
# Initializes the tasks to run and awaits their results
for response in await asyncio.gather(*tasks):
pass
現在的程式碼建立了多個執行緒,為每個csv檔案執行fetch函式進行下載。
最後,我們的mian函式為了正確的初始化非同步函式,也需要稍微做些修改。
def main():
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
loop.run_until_complete(future)
main()
再執行下,看看結果:
略微修改後,12個檔案的下載時間3.43s vs 10.84s。下載時間減少了近70%。
import requests
import asyncio
from concurrent.futures import ThreadPoolExecutor
from timeit import default_timer
START_TIME = default_timer()
def fetch(session, csv):
base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
with session.get(base_url + csv) as response:
data = response.text
if response.status_code != 200:
print("FAILURE::{0}".format(url))
elapsed = default_timer() - START_TIME
time_completed_at = "{:5.2f}s".format(elapsed)
print("{0:<30} {1:>20}".format(csv, time_completed_at))
return data
async def get_data_asynchronous():
csvs_to_fetch = [
"ford_escort.csv",
"cities.csv",
"hw_25000.csv",
"mlb_teams_2012.csv",
"nile.csv",
"homes.csv",
"hooke.csv",
"lead_shot.csv",
"news_decline.csv",
"snakes_count_10000.csv",
"trees.csv",
"zillow.csv"
]
print("{0:<30} {1:>20}".format("File", "Completed at"))
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
# Set any session parameters here before calling `fetch`
loop = asyncio.get_event_loop()
START_TIME = default_timer()
tasks = [
loop.run_in_executor(
executor,
fetch,
*(session, csv) # Allows us to pass in multiple arguments to `fetch`
)
for csv in csvs_to_fetch
]
for response in await asyncio.gather(*tasks):
pass
def main():
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
loop.run_until_complete(future)
main()
希望你喜歡這篇文章,並將這些技術應用到必須使用舊版本Python的專案。 儘管Python沒有簡單的async / await 模式,但要取得類似的結