1. 程式人生 > >並發編程——協程

並發編程——協程

hid 1-1 單純 執行 1.9 sogou 第三方庫 解決 -c

協程:

  基於單線程來實現並發。

  協程並不是實際存在的實體,本質上是一個線程的多個部分。

  比線程的單位更小——協程,纖程,在一個線程中可以開啟很多協程。

  在執行程序的過程中,遇到 IO 操作就凍結當前位置的狀態,去執行其他任務,在執行其他任務的過程中,會不斷地檢測上一個凍結的任務是否 IO 結束,如果 IO 結束了,就繼續從凍結的位置開始執行。

  一個線程不會遇到阻塞——一直在使用CPU。

  多個線程——只能有一個線程使用CPU。

  協程比線程之間的切換和線程的創建銷毀所花費的時間,空間開銷要小的多。

協程的特點:凍結當前程序/任務的執行狀態,可以規避IO操作的時間。

技術分享圖片
import
time def producer(): res = [] for i in range(1000000): res.append(i) return res def consumer(res): for i in res:pass start = time.time() res = producer() consumer(res) print(time.time()-start) # 0.26484227180480957 def producer(): for i in range(1000000): yield i def consumer(): g
= producer() for i in g:pass start = time.time() consumer() print(time.time() - start) # 0.09993767738342285 import time def consumer(): while True: x=yield def producer(): g=consumer() next(g) for i in range(10000000): g.send(i) start = time.time() producer() print(time.time() - start) #
1.6259949207305908 # 單純的切換,還是要耗費一些時間的,記住當前執行的狀態。 # 用時間換了空間
協程的引子

   協程:是單線程下的開發,又稱微線程,纖程。

  協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。

需要強調的是:

  1,python的線程是屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行。)

  2,單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率。(!!!非io操作的切換與效率無關)

對比操作系統控制線程的切換,用戶在線程內控制協程的切換:

優點如下:

  1,協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級。

  2,單線程內就可以實現並發的效果,最大限度地利用cpu.

缺點如下:

  1,協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟多個協程。

  2,協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程。

總結協程的特點:

  1,必須在只有一個單線程裏實現並發。

  2,修改共享數據不需要加鎖。

  3,用戶程序裏自己保存多個控制流的上下文棧。

  4,一個協程遇到 io 操作自動切換到其他協程

Greenlet模塊:

安裝:pip3 install greenlet

技術分享圖片
import time
from greenlet import greenlet

def func1(name):
    print(%s%name,123)
    g2.switch(小白)
    time.sleep(1)
    print(%s%name,abc)

def func2(name):
    time.sleep(1)
    print(%s%name,456)
    g1.switch()

g1 = greenlet(func1)    # 實例化
g2 = greenlet(func2)
g1.switch(清秋) # 開始運行 可以在第一次switch時傳入參數,以後就不用了。
‘‘‘
清秋 123
小白 456
清秋 abc
‘‘‘
greenlet實現狀態切換

單純的切換(在沒有io的情況下或者沒有重復開辟內存空間的操作),反而會降低程序的執行速度。

技術分享圖片
# 順序執行
import time

def f1():
    res = 1
    for i in range(10000000):
        res += i

def f2():
    res = 1
    for i in range(10000000):
        res *= i

start = time.time()
f1()
f2()
print(time.time()-start)    # 1.5120854377746582


# 切換執行
from greenlet import greenlet
import time
def f1():
    res = 1
    for i in range(10000000):
        res += i
        g2.switch()

def f2():
    res = 1
    for i in range(10000000):
        res *= i
        g1.switch()

start = time.time()
g1 = greenlet(f1)
g2 = greenlet(f2)
g1.switch()
print(time.time()-start)    # 1.9758000373840332

# 由上可知,單純的切換,反而會降低了程序的執行速度。
效率對比

  greenlet 只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時,如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。

  單線程裏的這20個任務的代碼通常會既有計算操作,又有阻塞操作,所以我們可以在這些時間去執行其他任務,這樣就能提高效率,這就用到了gevent模塊。

Gevent模塊:

  安裝:pip3 install gevent

  gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是greenlet,它是以C擴展模塊形式介入Python的輕量級協程。greenlet全部運行在主程序操作系統進程的內部,但他們被協作式的調度。

技術分享圖片
g1=gevent.spawn(func,1,,2,3,x=4,y=5)創建一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的

g2=gevent.spawn(func2)

g1.join() #等待g1結束

g2.join() #等待g2結束

#或者上述兩步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值
用法介紹 技術分享圖片
from gevent import monkey;monkey.patch_all()
# 它會把下面導入的所有模塊中的IO操作都打成一個包,gevent就能夠識別這些IO操作了。
import time
import gevent
# 使用gevent模塊來執行多個函數,表示在這些函數遇到IO操作的時候可以在同一個線程中進行切換。
# 利用其他任務的IO阻塞時間來切換到其他的任務繼續執行。
# spawn來發布協程任務
# gevent本身並不認識其他模塊中的IO操作,所以只有 from gevent import monkey;monkey.patch_all() 才能識別
# gevent就能夠認是在這句話後導入模塊的IO操作。

from threading import currentThread
def eat():
    print(eating1,currentThread())
    time.sleep(1)
    print(eating2)

def play():
    print(playing1,currentThread())
    time.sleep(1)
    print(playing2)
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
time.sleep(1)  # 停一會等待執行完畢
例子 技術分享圖片
from gevent import monkey;monkey.patch_all()
import time
import gevent
def eat(name):
    print(%s eat1 % name)
    time.sleep(1)
    print(%s eat2 % name)

def play(name):
    print(%s play1 % name)
    time.sleep(1)
    print(%s play2 % name)

g1 = gevent.spawn(eat,egon)
g2 = gevent.spawn(play,alex)
g1.join()
g2.join()
# 可以直接用 gevent.joinall([g1,g2])
print()
遇到IO自動切換 技術分享圖片
from gevent import monkey;monkey.patch_all()
import threading
import gevent
import time
def eat():
    print(threading.current_thread().getName()) # DummyThread-1
    print(eat1)
    time.sleep(2)
    print(eat2)
def play():
    print(threading.current_thread().getName()) # DummyThread-2
    print(play1)
    time.sleep(2)
    print(play2)

g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1,g2])
print()
查看threading.current_thread().getName()

gevent應用舉例:

技術分享圖片
from gevent import monkey;monkey.patch_all()
import time
import gevent
from urllib.request import urlopen

def get_page(url):
    res = urlopen(url)
    print(len(res.read()))

url_lst = [
    http://www.baidu.com,
    http://www.sogou.com,
    http://www.sohu.com,
    http://www.qq.com,
    http://www.cnblogs.com,
]
start = time.time()
gevent.joinall([gevent.spawn(get_page,url) for url in url_lst])
print(time.time()-start) # 1.0084402561187744
# 網頁讀取有一個機制,第一次讀取的時候時間都會普遍久
# 會將讀取的網頁緩存下來,以便下次的讀取用。

start = time.time()
gevent.joinall([gevent.spawn(get_page,url) for url in url_lst])
print(time.time()-start) # 0.5516667366027832

start = time.time()
for url in url_lst:
    get_page(url)
print(time.time()-start)    # 1.533193588256836

# 所以我們可以通過時間看出,協程爬取是比普通遍歷快很多。
協程應用,爬蟲

通過協程實現單線程下的socket並發:

技術分享圖片
from gevent import monkey;monkey.patch_all()
import socket
import gevent

def async_talk(conn):
    try:
        while True:
            conn.send(bhello)
            ret = conn.recv(1024)
            print(ret)
            # 為了實現能夠一直和同一個客戶端聊天。
    finally:
        conn.close()  # 在程序報錯的時候會關閉連接,節省空間

sk = socket.socket()
sk.bind((127.0.0.1,9000))
sk.listen()
while True:
    conn,addr = sk.accept() # 因為循環,所以可重復接收多個多個客戶端的連接
    gevent.spawn(async_talk,conn)   # 創建協程,將conn當參數傳入函數

sk.close()
server 技術分享圖片
import socket
from threading import Thread

def chat():
    sk = socket.socket()    # 放在函數內部,則每次一個線程就會有一個新的sk。
    sk.connect((127.0.0.1,9000))
    while True:     # 循環對話。
        print(sk.recv(1024))
        sk.send(bbye)
    sk.close()
for i in range(500): # 創建500個線程客戶端
    Thread(target=chat).start()
client

並發編程——協程