1. 程式人生 > >17、第七周-網絡編程 - 協程概念介紹、協程gevent模塊並發爬網頁

17、第七周-網絡編程 - 協程概念介紹、協程gevent模塊並發爬網頁

似的 soc 註解 單線程 部分 ESS 封裝 控制流 能力


協程,又稱微線程,纖程。什麽是線程:協程是一種用戶態的輕量級線程。

  協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

協程的好處:

  • 無需線程上下文切換的開銷
  • 無需原子操作鎖定及同步的開銷(註解:"原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 (切換到另一個線程。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。
  • 方便切換控制流,簡化編程模型
  • 高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。

協程的缺點:

  • 無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  • 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序

符合協程的標準

  1. 必須在只有一個單線程裏實現並發
  2. 修改共享數據不需加鎖
  3. 用戶程序裏自己保存多個控制流的上下文棧
  4. 一個協程遇到IO操作自動切換到其它協程

舉例:

1、使用yield實現協程操作:

import time
import queue


def consumer(name):
    print("--->starting eating baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name, new_baozi))
        # time.sleep(1)


def producer():
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n += 1
        con.send(n)
        con2.send(n)
        print("\033[32;1m[producer]\033[0m is making baozi %s" % n)


if __name__ == ‘__main__‘:
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()
輸出:
--->starting eating baozi...
--->starting eating baozi...
[c1] is eating baozi 1
[c2] is eating baozi 1
[producer] is making baozi 1
[c1] is eating baozi 2
[c2] is eating baozi 2
[producer] is making baozi 2
[c1] is eating baozi 3
[c2] is eating baozi 3
[producer] is making baozi 3
[c1] is eating baozi 4
[c2] is eating baozi 4
[producer] is making baozi 4
[c1] is eating baozi 5
[c2] is eating baozi 5
[producer] is making baozi 5

2、使用第三方模塊:greenlet (手動指定執行切換協程)

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可以使你在任意函數之間隨意切換,而不需把這個函數先聲明為generator。

  greenlet是python的並行處理的一個庫。 python 有一個非常有名的庫叫做 stackless ,用來做並發處理, 主要是弄了個叫做tasklet的微線程的東西, 而greenlet 跟stackless的最大區別是greenlet需要你自己來處理線程切換, 就是說,你需要自己指定現在執行哪個greenlet再執行哪個greenlet。相當於手動切換協程。

  一個 “greenlet” 是一個小型的獨立偽線程。可以把它想像成一些棧幀,棧底是初始調用的函數,而棧頂是當前greenlet的暫停位置。你使用greenlet創建一堆這樣的堆棧,然後在他們之間跳轉執行。跳轉必須顯式聲明的:一個greenlet必須選擇要跳轉到的另一個greenlet,這會讓前一個掛起,而後一個在此前掛起處恢復執行。不同greenlets之間的跳轉稱為切換(switching) 。

  當你創建一個greenlet時,它得到一個開始時為空的棧;當你第一次切換到它時,它會執行指定的函數,這個函數可能會調用其他函數、切換跳出greenlet等等。當最終棧底的函數執行結束出棧時,這個greenlet的棧又變成空的,這個greenlet也就死掉了。greenlet也會因為一個未捕捉的異常死掉。

舉例:

from greenlet import greenlet

def test1():
    print(12) #2、打印
    gr2.switch()#3、切換協程到 test2
    print(34)#6、打印
    gr2.switch()#7、切換到test2

def test2():
    print(56)#4、打印
    gr1.switch() #5、切換協程到 test1
    print(78)#8、打印,執行完成

gr1 = greenlet(test1) #啟動一個協程
gr2 = greenlet(test2) #啟動一個協程
gr1.switch()  #1、開始調用切換協程

輸出:
12
56
34
78

註意:執行的步驟順序,從1-8。

以上例子還不能實現在協程中自動切換,greenlet 只能手動指定執行,但對於生成器來說簡單很多。要實現自動監控,並且自動切換協程,如何實現?引入gevent模塊。

3、使用第三方模塊:gevent (自動監控並自動切換協程)

  Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。

  Gevent是第三方庫,通過greenlet實現協程,其基本思想是:當一個greenlet遇到IO操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程序處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。由於切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標準庫,這一過程在啟動時通過monkey patch完成。

包含的特性:
1.基於libev的快速事件循環
2.基於greenlet的輕量級執行單元
3.重用Python標準庫且概念相似的API
4.支持SSL的協作socket
5.通過c-ares或者線程池進行DNS查詢
6.使用標準庫和第三方庫中使用了阻塞socket的代碼的能力

第三庫需要另外,開源包進行安裝。

舉例:

A、驗證gevent通過自動判斷,選擇最優的線路進行判斷執行。註意:可gevent.sleep() 調整時間,進行驗證測試。結論:每個函數裏面最後一次打印,看等待的時間越短越先執行。

import gevent

def foo():
    print(‘running foo‘)
    gevent.sleep(3)
    print(‘Explicit context switch to foo again‘)
def bar():
    print(‘running bar‘)
    gevent.sleep(6)
    print(‘Implicit context switch back to bar ‘)
def func3():
    print("running func3")
    gevent.sleep(1)
    print ("switch back to func3")

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
    gevent.spawn(func3)
])
輸出:
running foo
running bar
running func3
switch back to func3
Explicit context switch to foo again
Implicit context switch back to bar

B、同步與異步性能對區別,如下:

  程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完後才會繼續向下走。

import gevent

def task(pid):
    gevent.sleep(1)
    print(‘Task %s done‘ % pid)

def synchronous():
    for i in range(1, 6): #range從1到5打印
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(5)]
    gevent.joinall(threads) #


print(‘Synchronous:‘)
synchronous() #正常函數,串行的調用會每個1s打印一次

print(" ")

print(‘Asynchronous:‘)
asynchronous() #並行打印,等待一次性打印出來
輸出:
每相隔一秒打印
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
 
等待同一時間打印
Asynchronous:
Task 0 done
Task 1 done
Task 2 done
Task 3 done
Task 4 done

C、gevent協程爬取網頁,遇到IO阻塞時會自動切換業務。舉例如下:

from gevent import monkey;
monkey.patch_all()

import gevent,time
from  urllib.request import urlopen

def f(url):
    print(‘GET: %s‘ % url)
    resp = urlopen(url)
    data = resp.read()
    #print(data) #打印爬取到的網頁內容
    print(‘%d bytes received from %s.‘ % (len(data), url))

time_start = time.time()

urls = [‘http://www.cnblogs.com/alex3714/articles/5248247.html‘,
	    ‘http://www.cnblogs.com/chen170615/p/8797609.html‘,
	    ‘http://www.cnblogs.com/chen170615/p/8761768.html‘,
        ]
for i in urls:
    f(i)

print("同步執行時間:",time.time() - time_start)
print (" ")
async_time_start = time.time()
gevent.joinall([
    gevent.spawn(f, ‘http://www.cnblogs.com/alex3714/articles/5248247.html‘),
    gevent.spawn(f, ‘http://www.cnblogs.com/chen170615/p/8797609.html‘),
    gevent.spawn(f,‘http://www.cnblogs.com/chen170615/p/8761768.html‘)
])
print("異步執行時間:",time.time() - async_time_start)
輸出:
GET: http://www.cnblogs.com/alex3714/articles/5248247.html
92147 bytes received from http://www.cnblogs.com/alex3714/articles/5248247.html.
GET: http://www.cnblogs.com/chen170615/p/8797609.html
10930 bytes received from http://www.cnblogs.com/chen170615/p/8797609.html.
GET: http://www.cnblogs.com/chen170615/p/8761768.html
11853 bytes received from http://www.cnblogs.com/chen170615/p/8761768.html.
同步執行時間 20.319132089614868
 
GET: http://www.cnblogs.com/alex3714/articles/5248247.html
GET: http://www.cnblogs.com/chen170615/p/8797609.html
GET: http://www.cnblogs.com/chen170615/p/8761768.html
11853 bytes received from http://www.cnblogs.com/chen170615/p/8761768.html.
10930 bytes received from http://www.cnblogs.com/chen170615/p/8797609.html.
92147 bytes received from http://www.cnblogs.com/alex3714/articles/5248247.html.
異步執行時間: 0.28768205642700195

以上例子可以看出,gevent協程異步並發執行的性能高於同步串行的執行,遇到會等待的IO同時,異步的性能就表現的優異起來。(多執行幾次,就能看出對比。)

D、過gevent實現單線程下的多socket並發

舉例:

服務端:

協程gevent_socket_server.py

import sys,socket,time,gevent

from gevent import socket,monkey

monkey.patch_all()

def server(port):
    gevent_server = socket.socket()
    gevent_server.bind((‘0.0.0.0‘,port))
    gevent_server.listen()
    while True:
        cli,addr = gevent_server.accept()
        gevent.spawn(handle_request,cli)

def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:",data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as ex:
        print(ex)

    finally:
        conn.close()

if __name__ == "__main__":
    server(8001)

客戶端兩種類型,如下:

客戶端1:協程gevent_socket_client.py(普通的手工輸入模式)

import socket

HOST = "localhost"
PORT = 8001

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((HOST,PORT))

while True:
    msg = bytes(input(">>:").strip(),encoding="utf-8")
    s.sendall(msg)
    data = s.recv(1024)

    print(‘Received‘,repr(data))

s.close()

客戶端2:協程gevent_socket_cli.py(通過起進程的方式,並發執行)

import socket,threading

HOST = "localhost"
PORT = 8001

def sock_conn():

   s = socket.socket()
   s.connect((HOST,PORT))
   count = 0

   while True:
       s.sendall(("hello %s" % count).encode("utf-8"))
       data = s.recv(1024)

       print("[%s]recv from server:" % threading.get_ident(),data.decode())
       count += 1
   s.close()

for i in range(10): #測試註意數值,不要設置太大。要不然,機器回被卡死
    t = threading.Thread(target=sock_conn)
    t.start()

17、第七周-網絡編程 - 協程概念介紹、協程gevent模塊並發爬網頁