Python並發編程(線程隊列,協程,Greenlet,Gevent)
線程隊列
線程之間的通信我們列表行不行呢,當然行,那麽隊列和列表有什麽區別呢?
queue隊列 :使用import queue,用法與進程Queue一樣
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
- class
queue.
Queue
(maxsize=0) #先進先出
import queue #不需要通過threading模塊裏面導入,直接import queue就可以了,這是python自帶的#用法基本和我們進程multiprocess中的queue是一樣的 q=queue.Queue() q.put(‘first‘) q.put(‘second‘) q.put(‘third‘) # q.put_nowait() #沒有數據就報錯,可以通過try來搞 print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() #沒有數據就報錯,可以通過try來搞 ‘‘‘ 結果(先進先出): first second third ‘‘‘ 先進先出示例代碼
class queue.
LifoQueue
(maxsize=0) #last in fisrt out
import queue q=queue.LifoQueue() #隊列,類似於棧,棧我們提過嗎,是不是先進後出的順序啊 q.put(‘first‘) q.put(‘second‘) q.put(‘third‘) # q.put_nowait() print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() ‘‘‘ 結果(後進先出): third second first ‘‘‘ 先進後出示例代碼
class queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先級的隊列
import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高 q.put((-10,‘a‘)) q.put((-5,‘a‘)) #負數也可以 # q.put((20,‘ws‘)) #如果兩個值的優先級一樣,那麽按照後面的值的acsii碼順序來排序,如果字符串第一個數元素相同,比較第二個元素的acsii碼順序 # q.put((20,‘wd‘)) # q.put((20,{‘a‘:11})) #TypeError: unorderable types: dict() < dict() 不能是字典 # q.put((20,(‘w‘,1))) #優先級相同的兩個數據,他們後面的值必須是相同的數據類型才能比較,可以是元祖,也是通過元素的ascii碼順序來排序 q.put((20,‘b‘)) q.put((20,‘a‘)) q.put((0,‘b‘)) q.put((30,‘c‘)) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) ‘‘‘ 結果(數字越小優先級越高,優先級高的優先出隊): ‘‘‘ 優先級隊列示例代碼
這三種隊列都是線程安全的,不會出現多個線程搶占同一個資源或數據的情況。
協程介紹
協程:是單線程下的並發,又稱微線程,纖程。英文名Coroutine。一句話說明什麽是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。、
需要強調的是:
#1. python的線程屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行) #2. 單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
對比操作系統控制線程的切換,用戶在單線程內控制協程的切換
優點如下:
#1. 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級 #2. 單線程內就可以實現並發的效果,最大限度地利用cpu
缺點如下:
#1. 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程 #2. 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程
總結協程特點:
- 必須在只有一個單線程裏實現並發
- 修改共享數據不需加鎖
- 用戶程序裏自己保存多個控制流的上下文棧
- 附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制))
Greenlet
如果我們在單個線程內有20個任務,要想實現在多個任務之間切換,使用yield生成器的方式過於麻煩(需要先得到初始化一次的生成器,然後再調用send。。。非常麻煩),而使用greenlet模塊可以非常簡單地實現這20個任務直接的切換
#安裝 pip3 install greenlet
#真正的協程模塊就是使用greenlet完成的切換 from greenlet import greenlet def eat(name): print(‘%s eat 1‘ %name) #2 g2.switch(‘taibai‘) #3 print(‘%s eat 2‘ %name) #6 g2.switch() #7 def play(name): print(‘%s play 1‘ %name) #4 g1.switch() #5 print(‘%s play 2‘ %name) #8 g1=greenlet(eat) g2=greenlet(play) g1.switch(‘taibai‘)#可以在第一次switch時傳入參數,以後都不需要 1
單純的切換(在沒有io的情況下或者沒有重復開辟內存空間的操作),反而會降低程序的執行速度
#順序執行 import time def f1(): res=1 for i in range(100000000): res+=i def f2(): res=1 for i in range(100000000): res*=i start=time.time() f1() f2() stop=time.time() print(‘run time is %s‘ %(stop-start)) #10.985628366470337 #切換 from greenlet import greenlet import time def f1(): res=1 for i in range(100000000): res+=i g2.switch() def f2(): res=1 for i in range(100000000): res*=i g1.switch() start=time.time() g1=greenlet(f1) g2=greenlet(f2) g1.switch() stop=time.time() print(‘run time is %s‘ %(stop-start)) # 52.763017892837524 效率對比
greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。
上面這個圖,是協程真正的意義,雖然沒有規避固有的I/O時間,但是我們使用這個時間來做別的事情了,一般在工作中我們都是進程+線程+協程的方式來實現並發,以達到最好的並發效果,如果是4核的cpu,一般起5個進程,每個進程中20個線程(5倍cpu數量),每個線程可以起500個協程,大規模爬取頁面的時候,等待網絡延遲的時間的時候,我們就可以用協程去實現並發。 並發數量 = 5 * 20 * 500 = 50000個並發,這是一般一個4cpu的機器最大的並發數。nginx在負載均衡的時候最大承載量就是5w個
單線程裏的這20個任務的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提高效率,這就用到了Gevent模塊。
Gevent介紹
#安裝 pip3 install gevent
Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
#用法 g1=gevent.spawn(func,1,2,3,x=4,y=5)創建一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的,spawn是異步提交任務 g2=gevent.spawn(func2) g1.join() #等待g1結束,上面只是創建協程對象,這個join才是去執行 g2.join() #等待g2結束 有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,但是你會發現,如果g2裏面的任務執行的時間長,但是不寫join的話,就不會執行完等到g2剩下的任務了 #或者上述兩步合作一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值
遇到IO阻塞時會自動切換任務
import gevent def eat(name): print(‘%s eat 1‘ %name) gevent.sleep(2) print(‘%s eat 2‘ %name) def play(name): print(‘%s play 1‘ %name) gevent.sleep(1) print(‘%s play 2‘ %name) g1=gevent.spawn(eat,‘egon‘) g2=gevent.spawn(play,name=‘egon‘) g1.join() g2.join() #或者gevent.joinall([g1,g2]) print(‘主‘) 遇到I/O切換
上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,
而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了
from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊之前
或者我們幹脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭
from gevent import monkey;monkey.patch_all() #必須寫在最上面,這句話後面的所有阻塞全部能夠識別了 import gevent #直接導入即可 import time def eat(): #print() print(‘eat food 1‘) time.sleep(2) #加上mokey就能夠識別到time模塊的sleep了 print(‘eat food 2‘) def play(): print(‘play 1‘) time.sleep(1) #來回切換,直到一個I/O的時間結束,這裏都是我們個gevent做得,不再是控制不了的操作系統了。 print(‘play 2‘) g1=gevent.spawn(eat) g2=gevent.spawn(play_phone) gevent.joinall([g1,g2]) print(‘主‘)
我們可以用threading.current_thread().getName()來查看每個g1和g2,查看的結果為DummyThread-n,即假線程,虛擬線程,其實都在一個線程裏面
進程線程的任務切換是由操作系統自行切換的,你自己不能控制
協程是通過自己的程序(代碼)來進行切換的,自己能夠控制,只有遇到協程模塊能夠識別的IO操作的時候,程序才會進行任務切換,實現並發效果,如果所有程序都沒有IO操作,那麽就基本屬於串行執行了。
Gevent之同步與異步
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print(‘Task %s done‘ % pid) def synchronous(): for i in range(10): task(i) def asynchronous(): g_l=[spawn(task,i) for i in range(10)] joinall(g_l) if __name__ == ‘__main__‘: print(‘Synchronous:‘) synchronous() print(‘Asynchronous:‘) asynchronous() #上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完後才會繼續向下走。 協程:同步異步對比
Gevent之應用舉例一
from gevent import monkey;monkey.patch_all() import gevent import requests import time def get_page(url): print(‘GET: %s‘ %url) response=requests.get(url) if response.status_code == 200: print(‘%d bytes received from %s‘ %(len(response.text),url)) start_time=time.time() gevent.joinall([ gevent.spawn(get_page,‘https://www.python.org/‘), gevent.spawn(get_page,‘https://www.yahoo.com/‘), gevent.spawn(get_page,‘https://github.com/‘), ]) stop_time=time.time() print(‘run time is %s‘ %(stop_time-start_time)) 協程應用:爬蟲
將上面的程序最後加上一段串行的代碼看看效率:如果你的程序不需要太高的效率,那就不用什麽並發啊協程啊之類的東西。
print(‘--------------------------------‘) s = time.time() requests.get(‘https://www.python.org/‘) requests.get(‘https://www.yahoo.com/‘) requests.get(‘https://github.com/‘) t = time.time() print(‘串行時間>>‘,t-s)
Gevent之應用舉例二
通過gevent實現單線程下的socket並發(from gevent import monkey;monkey.patch_all()一定要放到導入socket模塊之前,否則gevent無法識別socket的阻塞)
一個網絡請求裏面經過多個時間延遲time
from gevent import monkey;monkey.patch_all() from socket import * import gevent #如果不想用money.patch_all()打補丁,可以用gevent自帶的socket # from gevent import socket # s=socket.socket() def server(server_ip,port): s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind((server_ip,port)) s.listen(5) while True: conn,addr=s.accept() gevent.spawn(talk,conn,addr) def talk(conn,addr): try: while True: res=conn.recv(1024) print(‘client %s:%s msg: %s‘ %(addr[0],addr[1],res)) conn.send(res.upper()) except Exception as e: print(e) finally: conn.close() if __name__ == ‘__main__‘: server(‘127.0.0.1‘,8080) 服務端
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect((‘127.0.0.1‘,8080)) while True: msg=input(‘>>: ‘).strip() if not msg:continue client.send(msg.encode(‘utf-8‘)) msg=client.recv(1024) 客戶端
from threading import Thread from socket import * import threading def client(server_ip,port): c=socket(AF_INET,SOCK_STREAM) #套接字對象一定要加到函數內,即局部名稱空間內,放在函數外則被所有線程共享,則大家公用一個套接字對象,那麽客戶端端口永遠一樣了 c.connect((server_ip,port)) count=0 while True: c.send((‘%s say hello %s‘ %(threading.current_thread().getName(),count)).encode(‘utf-8‘)) msg=c.recv(1024) print(msg.decode(‘utf-8‘)) count+=1 if __name__ == ‘__main__‘: for i in range(500): t=Thread(target=client,args=(‘127.0.0.1‘,8080)) t.start() 多線程並發多個客戶端,去請求上面的服務端是沒問題的
上面代碼的服務端用gevent的時候為什麽沒有用join就執行了。
Python並發編程(線程隊列,協程,Greenlet,Gevent)