1. 程式人生 > >python中多程序+協程的使用以及為什麼要用它

python中多程序+協程的使用以及為什麼要用它

前面講了為什麼python裡推薦用多程序而不是多執行緒,但是多程序也有其自己的限制:相比執行緒更加笨重、切換耗時更長,並且在python的多程序下,程序數量不推薦超過CPU核心數(一個程序只有一個GIL,所以一個程序只能跑滿一個CPU),因為一個程序佔用一個CPU時能充分利用機器的效能,但是程序多了就會出現頻繁的程序切換,反而得不償失。

不過特殊情況(特指IO密集型任務)下,多執行緒是比多程序好用的。

舉個例子:給你200W條url,需要你把每個url對應的頁面抓取儲存起來,這種時候,單單使用多程序,效果肯定是很差的。為什麼呢?

例如每次請求的等待時間是2秒,那麼如下(忽略cpu計算時間):

1、單程序+單執行緒:需要2秒*200W=400W秒==1111.11個小時==46.3天,這個速度明顯是不能接受的

2、單程序+多執行緒:例如我們在這個程序中開了10個多執行緒,比1中能夠提升10倍速度,也就是大約4.63天能夠完成200W條抓取,請注意,這裡的實際執行是:執行緒1遇見了阻塞,CPU切換到執行緒2去執行,遇見阻塞又切換到執行緒3等等,10個執行緒都阻塞後,這個程序就阻塞了,而直到某個執行緒阻塞完成後,這個程序才能繼續執行,所以速度上提升大約能到10倍(這裡忽略了執行緒切換帶來的開銷,實際上的提升應該是不能達到10倍的),但是需要考慮的是執行緒的切換也是有開銷的,所以不能無限的啟動多執行緒(開200W個執行緒肯定是不靠譜的)

3、多程序+多執行緒:這裡就厲害了,一般來說也有很多人用這個方法,多程序下,每個程序都能佔一個cpu,而多執行緒從一定程度上繞過了阻塞的等待,所以比單程序下的多執行緒又更好使了,例如我們開10個程序,每個程序裡開20W個執行緒,執行的速度理論上是比單程序開200W個執行緒快10倍以上的(為什麼是10倍以上而不是10倍,主要是cpu切換200W個執行緒的消耗肯定比切換20W個程序大得多,考慮到這部分開銷,所以是10倍以上)。

還有更好的方法嗎?答案是肯定的,它就是:

4、協程,使用它之前我們先講講what/why/how(它是什麼/為什麼用它/怎麼使用它)

what:

協程是一種使用者級的輕量級執行緒。協程擁有自己的暫存器上下文和棧。協程排程切換時,將暫存器上下文和棧儲存到其他地方,在切回來的時候,恢復先前儲存的暫存器上下文和棧。因此:

協程能保留上一次呼叫時的狀態(即所有區域性狀態的一個特定組合),每次過程重入時,就相當於進入上一次呼叫的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

在併發程式設計中,協程與執行緒類似,每個協程表示一個執行單元,有自己的本地資料,與其它協程共享全域性資料和其它資源。

why:

目前主流語言基本上都選擇了多執行緒作為併發設施,與執行緒相關的概念是搶佔式多工(Preemptive multitasking),而與協程相關的是協作式多工。

不管是程序還是執行緒,每次阻塞、切換都需要陷入系統呼叫(system call),先讓CPU跑作業系統的排程程式,然後再由排程程式決定該跑哪一個程序(執行緒)。
而且由於搶佔式排程執行順序無法確定的特點,使用執行緒時需要非常小心地處理同步問題,而協程完全不存在這個問題(事件驅動和非同步程式也有同樣的優點)。

因為協程是使用者自己來編寫排程邏輯的,對CPU來說,協程其實是單執行緒,所以CPU不用去考慮怎麼排程、切換上下文,這就省去了CPU的切換開銷,所以協程在一定程度上又好於多執行緒。

how:

python裡面怎麼使用協程?答案是使用gevent,使用方法:看這裡

使用協程,可以不受執行緒開銷的限制,我嘗試過一次把20W條url放在單程序的協程裡執行,完全沒問題。

所以最推薦的方法,是多程序+協程(可以看作是每個程序裡都是單執行緒,而這個單執行緒是協程化的)

多程序+協程下,避開了CPU切換的開銷,又能把多個CPU充分利用起來,這種方式對於資料量較大的爬蟲還有檔案讀寫之類的效率提升是巨大的。

小例子:

#-*- coding=utf-8 -*-
import requests
from multiprocessing import Process
import gevent
from gevent import monkey; monkey.patch_all()

import sys
reload(sys)
sys.setdefaultencoding('utf8')
def fetch(url):
    try:
        s = requests.Session()
        r = s.get(url,timeout=1)#在這裡抓取頁面
    except Exception,e:
        print e 
    return ''
 
def process_start(url_list):
    tasks = []
    for url in url_list:
        tasks.append(gevent.spawn(fetch,url))
    gevent.joinall(tasks)#使用協程來執行

def task_start(filepath,flag = 100000):#每10W條url啟動一個程序
    with open(filepath,'r') as reader:#從給定的檔案中讀取url
        url = reader.readline().strip()
        url_list = []#這個list用於存放協程任務
        i = 0 #計數器,記錄添加了多少個url到協程佇列
        while url!='':
            i += 1
            url_list.append(url)#每次讀取出url,將url新增到佇列
            if i == flag:#一定數量的url就啟動一個程序並執行
                p = Process(target=process_start,args=(url_list,))
                p.start()
                url_list = [] #重置url佇列
                i = 0 #重置計數器
            url = reader.readline().strip()
        if url_list not []:#若退出迴圈後任務佇列裡還有url剩餘
            p = Process(target=process_start,args=(url_list,))#把剩餘的url全都放到最後這個程序來執行
            p.start()
  
if __name__ == '__main__':
    task_start('./testData.txt')#讀取指定檔案
 


細心的同學會發現:上面的例子中隱藏了一個問題:程序的數量會隨著url數量的增加而不斷增加,我們在這裡不使用程序池multiprocessing.Pool來控制程序數量的原因是multiprocessing.Pool和gevent有衝突不能同時使用,但是有興趣的同學可以研究一下gevent.pool這個協程池。