並發編程——協程
協程:
基於單線程來實現並發。
協程並不是實際存在的實體,本質上是一個線程的多個部分。
比線程的單位更小——協程,纖程,在一個線程中可以開啟很多協程。
在執行程序的過程中,遇到 IO 操作就凍結當前位置的狀態,去執行其他任務,在執行其他任務的過程中,會不斷地檢測上一個凍結的任務是否 IO 結束,如果 IO 結束了,就繼續從凍結的位置開始執行。
一個線程不會遇到阻塞——一直在使用CPU。
多個線程——只能有一個線程使用CPU。
協程比線程之間的切換和線程的創建銷毀所花費的時間,空間開銷要小的多。
協程的特點:凍結當前程序/任務的執行狀態,可以規避IO操作的時間。
import協程的引子time def producer(): res = [] for i in range(1000000): res.append(i) return res def consumer(res): for i in res:pass start = time.time() res = producer() consumer(res) print(time.time()-start) # 0.26484227180480957 def producer(): for i in range(1000000): yield i def consumer(): g= producer() for i in g:pass start = time.time() consumer() print(time.time() - start) # 0.09993767738342285 import time def consumer(): while True: x=yield def producer(): g=consumer() next(g) for i in range(10000000): g.send(i) start = time.time() producer() print(time.time() - start) #1.6259949207305908 # 單純的切換,還是要耗費一些時間的,記住當前執行的狀態。 # 用時間換了空間
協程:是單線程下的開發,又稱微線程,纖程。
協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。
需要強調的是:
1,python的線程是屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行。)
2,單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率。(!!!非io操作的切換與效率無關)
對比操作系統控制線程的切換,用戶在線程內控制協程的切換:
優點如下:
1,協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級。
2,單線程內就可以實現並發的效果,最大限度地利用cpu.
缺點如下:
1,協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟多個協程。
2,協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程。
總結協程的特點:
1,必須在只有一個單線程裏實現並發。
2,修改共享數據不需要加鎖。
3,用戶程序裏自己保存多個控制流的上下文棧。
4,一個協程遇到 io 操作自動切換到其他協程
Greenlet模塊:
安裝:pip3 install greenlet
import time from greenlet import greenlet def func1(name): print(‘%s‘%name,123) g2.switch(‘小白‘) time.sleep(1) print(‘%s‘%name,‘abc‘) def func2(name): time.sleep(1) print(‘%s‘%name,456) g1.switch() g1 = greenlet(func1) # 實例化 g2 = greenlet(func2) g1.switch(‘清秋‘) # 開始運行 可以在第一次switch時傳入參數,以後就不用了。 ‘‘‘ 清秋 123 小白 456 清秋 abc ‘‘‘greenlet實現狀態切換
單純的切換(在沒有io的情況下或者沒有重復開辟內存空間的操作),反而會降低程序的執行速度。
# 順序執行 import time def f1(): res = 1 for i in range(10000000): res += i def f2(): res = 1 for i in range(10000000): res *= i start = time.time() f1() f2() print(time.time()-start) # 1.5120854377746582 # 切換執行 from greenlet import greenlet import time def f1(): res = 1 for i in range(10000000): res += i g2.switch() def f2(): res = 1 for i in range(10000000): res *= i g1.switch() start = time.time() g1 = greenlet(f1) g2 = greenlet(f2) g1.switch() print(time.time()-start) # 1.9758000373840332 # 由上可知,單純的切換,反而會降低了程序的執行速度。效率對比
greenlet 只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時,如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。
單線程裏的這20個任務的代碼通常會既有計算操作,又有阻塞操作,所以我們可以在這些時間去執行其他任務,這樣就能提高效率,這就用到了gevent模塊。
Gevent模塊:
安裝:pip3 install gevent
gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是greenlet,它是以C擴展模塊形式介入Python的輕量級協程。greenlet全部運行在主程序操作系統進程的內部,但他們被協作式的調度。
g1=gevent.spawn(func,1,,2,3,x=4,y=5)創建一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的 g2=gevent.spawn(func2) g1.join() #等待g1結束 g2.join() #等待g2結束 #或者上述兩步合作一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值用法介紹
from gevent import monkey;monkey.patch_all() # 它會把下面導入的所有模塊中的IO操作都打成一個包,gevent就能夠識別這些IO操作了。 import time import gevent # 使用gevent模塊來執行多個函數,表示在這些函數遇到IO操作的時候可以在同一個線程中進行切換。 # 利用其他任務的IO阻塞時間來切換到其他的任務繼續執行。 # spawn來發布協程任務 # gevent本身並不認識其他模塊中的IO操作,所以只有 from gevent import monkey;monkey.patch_all() 才能識別 # gevent就能夠認是在這句話後導入模塊的IO操作。 from threading import currentThread def eat(): print(‘eating1‘,currentThread()) time.sleep(1) print(‘eating2‘) def play(): print(‘playing1‘,currentThread()) time.sleep(1) print(‘playing2‘) g1 = gevent.spawn(eat) g2 = gevent.spawn(play) time.sleep(1) # 停一會等待執行完畢例子
from gevent import monkey;monkey.patch_all() import time import gevent def eat(name): print(‘%s eat1‘ % name) time.sleep(1) print(‘%s eat2‘ % name) def play(name): print(‘%s play1‘ % name) time.sleep(1) print(‘%s play2‘ % name) g1 = gevent.spawn(eat,‘egon‘) g2 = gevent.spawn(play,‘alex‘) g1.join() g2.join() # 可以直接用 gevent.joinall([g1,g2]) print(‘主‘)遇到IO自動切換
from gevent import monkey;monkey.patch_all() import threading import gevent import time def eat(): print(threading.current_thread().getName()) # DummyThread-1 print(‘eat1‘) time.sleep(2) print(‘eat2‘) def play(): print(threading.current_thread().getName()) # DummyThread-2 print(‘play1‘) time.sleep(2) print(‘play2‘) g1 = gevent.spawn(eat) g2 = gevent.spawn(play) gevent.joinall([g1,g2]) print(‘主‘)查看threading.current_thread().getName()
gevent應用舉例:
from gevent import monkey;monkey.patch_all() import time import gevent from urllib.request import urlopen def get_page(url): res = urlopen(url) print(len(res.read())) url_lst = [ ‘http://www.baidu.com‘, ‘http://www.sogou.com‘, ‘http://www.sohu.com‘, ‘http://www.qq.com‘, ‘http://www.cnblogs.com‘, ] start = time.time() gevent.joinall([gevent.spawn(get_page,url) for url in url_lst]) print(time.time()-start) # 1.0084402561187744 # 網頁讀取有一個機制,第一次讀取的時候時間都會普遍久 # 會將讀取的網頁緩存下來,以便下次的讀取用。 start = time.time() gevent.joinall([gevent.spawn(get_page,url) for url in url_lst]) print(time.time()-start) # 0.5516667366027832 start = time.time() for url in url_lst: get_page(url) print(time.time()-start) # 1.533193588256836 # 所以我們可以通過時間看出,協程爬取是比普通遍歷快很多。協程應用,爬蟲
通過協程實現單線程下的socket並發:
from gevent import monkey;monkey.patch_all() import socket import gevent def async_talk(conn): try: while True: conn.send(b‘hello‘) ret = conn.recv(1024) print(ret) # 為了實現能夠一直和同一個客戶端聊天。 finally: conn.close() # 在程序報錯的時候會關閉連接,節省空間 sk = socket.socket() sk.bind((‘127.0.0.1‘,9000)) sk.listen() while True: conn,addr = sk.accept() # 因為循環,所以可重復接收多個多個客戶端的連接 gevent.spawn(async_talk,conn) # 創建協程,將conn當參數傳入函數 sk.close()server
import socket from threading import Thread def chat(): sk = socket.socket() # 放在函數內部,則每次一個線程就會有一個新的sk。 sk.connect((‘127.0.0.1‘,9000)) while True: # 循環對話。 print(sk.recv(1024)) sk.send(b‘bye‘) sk.close() for i in range(500): # 創建500個線程客戶端 Thread(target=chat).start()client
並發編程——協程