1. 程式人生 > >Python資料抓取——多執行緒,非同步

Python資料抓取——多執行緒,非同步

作業系統可以同時執行多個任務。首先,考慮單核CPU是如何執行多工的:作業系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度非常快,給人的感覺就像所有任務都在同時執行一樣。真正的並行執行多工只能在多核CPU上實現,但是,由於任務數量遠遠多於CPU的核心數量,所以,作業系統也會自動把很多工輪流排程到每個核心上執行

對於作業系統來說,一個任務就是一個程序(Process),比如開啟一個瀏覽器就是啟動一個瀏覽器程序,開啟一個記事本就啟動了一個記事本程序,開啟兩個記事本就啟動了兩個記事本程序,開啟一個Word就啟動了一個Word程序。有些程序還不止同時幹一件事,比如Word,它可以同時進行打字、拼寫檢查、列印等事情。在一個程序內部,要同時幹多件事,就需要同時執行多個“子任務”,我們把程序內的這些“子任務”稱為執行緒(Thread)

。由於每個程序至少要幹一件事,所以一個程序至少有一個執行緒多執行緒的執行方式和多程序是一樣的,也是由作業系統在多個執行緒之間快速切換,讓每個執行緒都短暫地交替執行,看起來就像同時執行一樣,真正能同時執行多執行緒需要多核CPU才可能實現

我們前面編寫的所有的Python程式,都是執行單任務的程序,也就是隻有一個執行緒。如果要同時執行多個任務有3種方案:一種是啟動多個程序,每個程序只開一個執行緒,但多個程序可以一塊執行多個任務。還有一種方法是啟動一個程序,在一個程序內啟動多個執行緒,多個執行緒也可以一塊執行多個任務。第三種方法,就是啟動多個程序,每個程序再啟動多個執行緒,這樣同時執行的任務就更多了,這種模型很複雜,實際很少採用

。多工的實現有3種方式:多程序模式;多執行緒模式;多程序+多執行緒模式。同時執行多個任務通常各個任務之間需要相互通訊和協調,有時,任務1必須暫停等待任務2完成後才能繼續執行,有時,任務3和任務4又不能同時執行,所以,多程序和多執行緒的程式的複雜度要遠遠高於我們前面寫的單程序單執行緒的程式。因為複雜度高,除錯困難,所以,不是迫不得已,我們也不想編寫多工。但是,有很多時候,沒有多工還真不行。想想在電腦上看電影,就必須由一個執行緒播放視訊,另一個執行緒播放音訊,否則,單執行緒實現的話就只能先把視訊播放完再播放音訊,或者先把音訊播放完再播放視訊,這顯然是不行的。

Python既支援多程序,又支援多執行緒。多工可以由多程序完成,也可以由一個程序內的多執行緒完成。程序是由若干執行緒組成的,一個程序至少有一個執行緒。由於執行緒是作業系統直接支援的執行單元,因此,高階語言通常都內建多執行緒的支援,Python也不例外,並且,Python的執行緒是真正的Posix Thread,而不是模擬出來的執行緒。Python的標準庫提供了兩個模組:thread和threading,thread是低階模組,threading是高階模組,對thread進行了封裝。絕大多數情況下,我們只需要使用threading這個高階模組。啟動一個執行緒就是把一個函式傳入並建立Thread例項,然後呼叫start()開始執行。

import requests
import threading

def get_stock(code):
    url = 'http://hq.sinajs.cn/list=' + code
    resp = requests.get(url)
    print('%s\n' % resp.text)

#多執行緒非同步,加速抓取
#根據有幾個股票程式碼,就建立幾個執行緒
codes = ['sz000878', 'sh600993', 'sz000002', 'sz002230']
threads = [threading.Thread(target=get_stock, args=(code, )) for code in codes]
#Thread建立執行緒例項
'''
threads=[ ]
for code in codes:
    thread=threading.Thread(target=get_stock,args=(code, ))
    threads.append(thread)
'''
for t in threads:
    t.start()  #啟動一個執行緒
for t in threads:
    t.join()  #等待每個執行緒執行結束

這裡寫圖片描述

多工用執行緒池自動排程

import requests
import threadpool  #執行緒池

def get_stock(code):
    url = 'http://hq.sinajs.cn/list=' + code
    resp = requests.get(url)
    print('%s\n' % resp.text)

codes = ['sz000878', 'sh600993', 'sz000002', 'sz002230']
#codes裡任務很多,比如幾百個,讓pool自己去排程
pool = threadpool.ThreadPool(2) #執行緒池設定,最多同時跑兩個執行緒
tasks = threadpool.makeRequests(get_stock, codes)
#makeRequests構造執行緒task請求,第一個引數是執行緒函式,第二個是引數陣列
[pool.putRequest(task) for task in tasks]
#列表推導式,putRequest向執行緒池裡加task,讓pool自己去排程task
pool.wait() #等所有任務結束

這裡寫圖片描述

非同步
交出當前CPU的控制權,最大化利用當前單個CPU的效率

import aiohttp #表示http請求是非同步方式去請求的
import asyncio #當非同步請求返回時,通知非同步操作完成

#非同步可以參考grequests庫的使用:https://github.com/kennethreitz/grequests
async def get_stock(code):
#關鍵字async表示請求是非同步的
    url = 'http://hq.sinajs.cn/list=' + code
    resp = await aiohttp.request('GET', url) # yield
    #await表示任務等待時,不佔用CPU資源,通知請求返回
    body = await resp.read()
    #表示從網路上把請求的東西都讀回來
    text = body.decode('gb2312') #對讀回來的原始位元組解碼
    print(text)
    resp.close()

codes = ['sz000878', 'sh600993', 'sz000002', 'sz002230']
tasks = [get_stock(code) for code in codes]
#由於是非同步請求,這裡get_stock(code)並不會被馬上執行,只是佔用了一個位置

loop = asyncio.get_event_loop()  #loop的作用是——做完任務,事件通知
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

#tasks生成一組併發的非同步任務,loop表示非同步作用完成後等待通知