執行緒進階之執行緒佇列、執行緒池和協程
本節目錄:
1.執行緒佇列
2.執行緒池
3.協程
一、執行緒佇列
執行緒之間的通訊我們列表行不行呢,當然行,那麼佇列和列表有什麼區別呢?
queue佇列 :使用import queue,用法與程序Queue一樣
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
-
class
queue.
Queue
(maxsize=0) #先進先出
import queue #不需要通過threading模組裡面匯入,直接import queue就可以了,這是python自帶的 #用法基本和我們程序multiprocess中的queue是一樣的 q=queue.Queue() q.put('first') q.put('second') q.put('third') # q.put_nowait() #沒有資料就報錯,可以通過try來搞 print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() #沒有資料就報錯,可以通過try來搞 ''' 結果(先進先出): first second third先進先出示例程式碼'''
class queue.
LifoQueue
(maxsize=0) #last in fisrt out
import queue q=queue.LifoQueue() #佇列,類似於棧,棧我們提過嗎,是不是先進後出的順序啊 q.put('first') q.put('second') q.put('third') # q.put_nowait() print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() ''' 結果(後進先出): third second first先進後出示例程式碼'''
class queue.
PriorityQueue
(maxsize=0) #儲存資料時可設定優先順序的佇列
import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高 q.put((-10,'a')) q.put((-5,'a')) #負數也可以 # q.put((20,'ws')) #如果兩個值的優先順序一樣,那麼按照後面的值的acsii碼順序來排序,如果字串第一個數元素相同,比較第二個元素的acsii碼順序 # q.put((20,'wd')) # q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() 不能是字典 # q.put((20,('w',1))) #優先順序相同的兩個資料,他們後面的值必須是相同的資料型別才能比較,可以是元祖,也是通過元素的ascii碼順序來排序 q.put((20,'b')) q.put((20,'a')) q.put((0,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先順序越高,優先順序高的優先出隊): '''優先順序佇列示例程式碼
這三種佇列都是執行緒安全的,不會出現多個執行緒搶佔同一個資源或資料的情況。
二、執行緒池
Python標準模組——concurrent.futures
到這裡就差我們的執行緒池沒有遇到了,我們用一個新的模組給大家講,早期的時候我們沒有執行緒池,現在python提供了一個新的標準或者說內建的模組,這個模組裡面提供了新的執行緒池和程序池,之前我們說的程序池是在multiprocessing裡面的,現在這個在這個新的模組裡面,他倆用法上是一樣的。
為什麼要將程序池和執行緒池放到一起呢,是為了統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一樣,而且只要通過這個concurrent.futures匯入就可以直接用他們兩個了
concurrent.futures模組提供了高度封裝的非同步呼叫介面 ThreadPoolExecutor:執行緒池,提供非同步呼叫 ProcessPoolExecutor: 程序池,提供非同步呼叫 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs) 非同步提交任務 #map(func, *iterables, timeout=None, chunksize=1) 取代for迴圈submit的操作 #shutdown(wait=True) 相當於程序池的pool.close()+pool.join()操作 wait=True,等待池內所有任務執行完畢回收完資源後才繼續 wait=False,立即返回,並不會等待池內的任務執行完畢 但不管wait引數為何值,整個程式都會等到所有任務執行完畢 submit和map必須在shutdown之前 #result(timeout=None) 取得結果 #add_done_callback(fn) 回撥函式
import time import os import threading from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) print('%s列印的:'%(threading.get_ident()),n) return n*n tpool = ThreadPoolExecutor(max_workers=5) #預設一般起執行緒的資料不超過CPU個數*5 # tpool = ProcessPoolExecutor(max_workers=5) #程序池的使用只需要將上面的ThreadPoolExecutor改為ProcessPoolExecutor就行了,其他都不用改 #非同步執行 t_lst = [] for i in range(5): t = tpool.submit(func,i) #提交執行函式,返回一個結果物件,i作為任務函式的引數 def submit(self, fn, *args, **kwargs): 可以傳任意形式的引數 t_lst.append(t) # # print(t.result()) #這個返回的結果物件t,不能直接去拿結果,不然又變成串行了,可以理解為拿到一個號碼,等所有執行緒的結果都出來之後,我們再去通過結果物件t獲取結果 tpool.shutdown() #起到原來的close阻止新任務進來 + join的作用,等待所有的執行緒執行完畢 print('主執行緒') for ti in t_lst: print('>>>>',ti.result()) # 我們還可以不用shutdown(),用下面這種方式 # while 1: # for n,ti in enumerate(t_lst): # print('>>>>', ti.result(),n) # time.sleep(2) #每個兩秒去去一次結果,哪個有結果了,就可以取出哪一個,想表達的意思就是說不用等到所有的結果都出來再去取,可以輪詢著去取結果,因為你的任務需要執行的時間很長,那麼你需要等很久才能拿到結果,通過這樣的方式可以將快速出來的結果先拿出來。如果有的結果物件裡面還沒有執行結果,那麼你什麼也取不到,這一點要注意,不是空的,是什麼也取不到,那怎麼判斷我已經取出了哪一個的結果,可以通過列舉enumerate來搞,記錄你是哪一個位置的結果物件的結果已經被取過了,取過的就不再取了 #結果分析: 列印的結果是沒有順序的,因為到了func函式中的sleep的時候執行緒會切換,誰先列印就沒準兒了,但是最後的我們通過結果物件取結果的時候拿到的是有序的,因為我們主執行緒進行for迴圈的時候,我們是按順序將結果物件新增到列表中的。 # 37220列印的: 0 # 32292列印的: 4 # 33444列印的: 1 # 30068列印的: 2 # 29884列印的: 3 # 主執行緒 # >>>> 0 # >>>> 1 # >>>> 4 # >>>> 9 # >>>> 16ThreadPoolExecutor的簡單使用
ThreadPoolExecutor的使用:
只需要將這一行程式碼改為下面這一行就可以了,其他的程式碼都不用變 tpool = ThreadPoolExecutor(max_workers=5) #預設一般起執行緒的資料不超過CPU個數*5 # tpool = ProcessPoolExecutor(max_workers=5) 你就會發現為什麼將執行緒池和程序池都放到這一個模組裡面了,用法一樣
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import threading import os,time,random def task(n): print('%s is runing' %threading.get_ident()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) s = executor.map(task,range(1,5)) #map取代了for+submit print([i for i in s])map的使用
import time import os import threading from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) return n*n def call_back(m): print('結果為:%s'%(m.result())) tpool = ThreadPoolExecutor(max_workers=5) t_lst = [] for i in range(5): t = tpool.submit(func,i).add_done_callback(call_back)回撥函式簡單應用
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<程序%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<程序%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future物件obj,需要用obj.result()拿到結果回撥函式的應用
三、協程
執行緒實現併發的最小單位
併發:記錄狀態+切換
1.生成器版生成器(僅僅是模仿了下大概的思路,實質沒有節省資源)
import time def f1(): for i in range(10): time.sleep(0.5) print('f1>>',i) yield def f2(): g = f1() for i in range(10): time.sleep(0.5) print('f2>>', i) next(g) f1() f2()生成器版協程
2.greenlet版協程
大概和生成器版差不多,兩個方法來回切換。偽協程!
import time from greenlet import greenlet def f1(s): print('第一次f1'+s) g2.switch('taibai') #切換到g2這個物件的任務去執行 time.sleep(1) print('第二次f1'+s) g2.switch() def f2(s): print('第一次f2'+s) g1.switch() time.sleep(1) print('第二次f2'+s) g1 = greenlet(f1) #例項化一個greenlet物件,並將任務名稱作為引數參進去 g2 = greenlet(f2) g1.switch('alex') #執行g1物件裡面的任務greenlet版的協程
3.gevent版協程(真正的協程)
import gevent import time def f1(): print("第一次f1") gevent.sleep(1) print("第二次f1") def f2(): print("第一次f2") gevent.sleep(2) print("第二次f2") s = time.time() g1 = gevent.spawn(f1) #非同步提交了f1任務 g2 = gevent.spawn(f2) #非同步提交了f2任務 g1.join() g2.join() e = time.time() print("執行時間:",e-s) print("主程式任務")gevent版協程(不完美版)
大家會發現一個問題就是隻能使用gevent.sleep來代替time.sleep。還有就是要g1.join()和g2.join()有些麻煩對不對,下面就是協程gevent版的升級版。
import gevent import time from gevent import monkey;monkey.patch_all() #可以接收所有的I/O def f1(): print("第一次f1") time.sleep(1) print("第二次f1") def f2(): print("第一次f2") time.sleep(2) print("第二次f2") s = time.time() g1 = gevent.spawn(f1) #非同步提交了f1任務 g2 = gevent.spawn(f2) #非同步提交了f2任務 gevent.joinall([g1,g2]) #一個列表裡面是任務名等同於g1.join()和g2.join() e = time.time() print("執行時間:",e-s) print("主程式任務")gevent版協程(升級版)