線程進階之線程隊列、線程池和協程
本節目錄:
1.線程隊列
2.線程池
3.協程
一、線程隊列
線程之間的通信我們列表行不行呢,當然行,那麽隊列和列表有什麽區別呢?
queue隊列 :使用import queue,用法與進程Queue一樣
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
- class
queue.
Queue
(maxsize=0) #先進先出
import queue #不需要通過threading模塊裏面導入,直接import queue就可以了,這是python自帶的先進先出示例代碼#用法基本和我們進程multiprocess中的queue是一樣的 q=queue.Queue() q.put(‘first‘) q.put(‘second‘) q.put(‘third‘) # q.put_nowait() #沒有數據就報錯,可以通過try來搞 print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() #沒有數據就報錯,可以通過try來搞 ‘‘‘ 結果(先進先出): first second third ‘‘‘
class queue.
LifoQueue
(maxsize=0) #last in fisrt out
import queue q=queue.LifoQueue() #隊列,類似於棧,棧我們提過嗎,是不是先進後出的順序啊 q.put(‘first‘) q.put(‘second‘) q.put(‘third‘) # q.put_nowait() print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() ‘‘‘ 結果(後進先出): third second first ‘‘‘先進後出示例代碼
class queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先級的隊列
import優先級隊列示例代碼queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高 q.put((-10,‘a‘)) q.put((-5,‘a‘)) #負數也可以 # q.put((20,‘ws‘)) #如果兩個值的優先級一樣,那麽按照後面的值的acsii碼順序來排序,如果字符串第一個數元素相同,比較第二個元素的acsii碼順序 # q.put((20,‘wd‘)) # q.put((20,{‘a‘:11})) #TypeError: unorderable types: dict() < dict() 不能是字典 # q.put((20,(‘w‘,1))) #優先級相同的兩個數據,他們後面的值必須是相同的數據類型才能比較,可以是元祖,也是通過元素的ascii碼順序來排序 q.put((20,‘b‘)) q.put((20,‘a‘)) q.put((0,‘b‘)) q.put((30,‘c‘)) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) ‘‘‘ 結果(數字越小優先級越高,優先級高的優先出隊): ‘‘‘
這三種隊列都是線程安全的,不會出現多個線程搶占同一個資源或數據的情況。
二、線程池
Python標準模塊——concurrent.futures
到這裏就差我們的線程池沒有遇到了,我們用一個新的模塊給大家講,早期的時候我們沒有線程池,現在python提供了一個新的標準或者說內置的模塊,這個模塊裏面提供了新的線程池和進程池,之前我們說的進程池是在multiprocessing裏面的,現在這個在這個新的模塊裏面,他倆用法上是一樣的。
為什麽要將進程池和線程池放到一起呢,是為了統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一樣,而且只要通過這個concurrent.futures導入就可以直接用他們兩個了
concurrent.futures模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor:線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs) 異步提交任務 #map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操作 #shutdown(wait=True) 相當於進程池的pool.close()+pool.join()操作 wait=True,等待池內所有任務執行完畢回收完資源後才繼續 wait=False,立即返回,並不會等待池內的任務執行完畢 但不管wait參數為何值,整個程序都會等到所有任務執行完畢 submit和map必須在shutdown之前 #result(timeout=None) 取得結果 #add_done_callback(fn) 回調函數
import time import os import threading from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) print(‘%s打印的:‘%(threading.get_ident()),n) return n*n tpool = ThreadPoolExecutor(max_workers=5) #默認一般起線程的數據不超過CPU個數*5 # tpool = ProcessPoolExecutor(max_workers=5) #進程池的使用只需要將上面的ThreadPoolExecutor改為ProcessPoolExecutor就行了,其他都不用改 #異步執行 t_lst = [] for i in range(5): t = tpool.submit(func,i) #提交執行函數,返回一個結果對象,i作為任務函數的參數 def submit(self, fn, *args, **kwargs): 可以傳任意形式的參數 t_lst.append(t) # # print(t.result()) #這個返回的結果對象t,不能直接去拿結果,不然又變成串行了,可以理解為拿到一個號碼,等所有線程的結果都出來之後,我們再去通過結果對象t獲取結果 tpool.shutdown() #起到原來的close阻止新任務進來 + join的作用,等待所有的線程執行完畢 print(‘主線程‘) for ti in t_lst: print(‘>>>>‘,ti.result()) # 我們還可以不用shutdown(),用下面這種方式 # while 1: # for n,ti in enumerate(t_lst): # print(‘>>>>‘, ti.result(),n) # time.sleep(2) #每個兩秒去去一次結果,哪個有結果了,就可以取出哪一個,想表達的意思就是說不用等到所有的結果都出來再去取,可以輪詢著去取結果,因為你的任務需要執行的時間很長,那麽你需要等很久才能拿到結果,通過這樣的方式可以將快速出來的結果先拿出來。如果有的結果對象裏面還沒有執行結果,那麽你什麽也取不到,這一點要註意,不是空的,是什麽也取不到,那怎麽判斷我已經取出了哪一個的結果,可以通過枚舉enumerate來搞,記錄你是哪一個位置的結果對象的結果已經被取過了,取過的就不再取了 #結果分析: 打印的結果是沒有順序的,因為到了func函數中的sleep的時候線程會切換,誰先打印就沒準兒了,但是最後的我們通過結果對象取結果的時候拿到的是有序的,因為我們主線程進行for循環的時候,我們是按順序將結果對象添加到列表中的。 # 37220打印的: 0 # 32292打印的: 4 # 33444打印的: 1 # 30068打印的: 2 # 29884打印的: 3 # 主線程 # >>>> 0 # >>>> 1 # >>>> 4 # >>>> 9 # >>>> 16ThreadPoolExecutor的簡單使用
ThreadPoolExecutor的使用:
只需要將這一行代碼改為下面這一行就可以了,其他的代碼都不用變 tpool = ThreadPoolExecutor(max_workers=5) #默認一般起線程的數據不超過CPU個數*5 # tpool = ProcessPoolExecutor(max_workers=5) 你就會發現為什麽將線程池和進程池都放到這一個模塊裏面了,用法一樣
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import threading import os,time,random def task(n): print(‘%s is runing‘ %threading.get_ident()) time.sleep(random.randint(1,3)) return n**2 if __name__ == ‘__main__‘: executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) s = executor.map(task,range(1,5)) #map取代了for+submit print([i for i in s])map的使用
import time import os import threading from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) return n*n def call_back(m): print(‘結果為:%s‘%(m.result())) tpool = ThreadPoolExecutor(max_workers=5) t_lst = [] for i in range(5): t = tpool.submit(func,i).add_done_callback(call_back)回調函數簡單應用
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print(‘<進程%s> get %s‘ %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {‘url‘:url,‘text‘:respone.text} def parse_page(res): res=res.result() print(‘<進程%s> parse %s‘ %(os.getpid(),res[‘url‘])) parse_res=‘url:<%s> size:[%s]\n‘ %(res[‘url‘],len(res[‘text‘])) with open(‘db.txt‘,‘a‘) as f: f.write(parse_res) if __name__ == ‘__main__‘: urls=[ ‘https://www.baidu.com‘, ‘https://www.python.org‘, ‘https://www.openstack.org‘, ‘https://help.github.com/‘, ‘http://www.sina.com.cn/‘ ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果回調函數的應用
三、協程
線程實現並發的最小單位
並發:記錄狀態+切換
1.生成器版生成器(僅僅是模仿了下大概的思路,實質沒有節省資源)
import time def f1(): for i in range(10): time.sleep(0.5) print(‘f1>>‘,i) yield def f2(): g = f1() for i in range(10): time.sleep(0.5) print(‘f2>>‘, i) next(g) f1() f2()生成器版協程
2.greenlet版協程
大概和生成器版差不多,兩個方法來回切換。偽協程!
import time from greenlet import greenlet def f1(s): print(‘第一次f1‘+s) g2.switch(‘taibai‘) #切換到g2這個對象的任務去執行 time.sleep(1) print(‘第二次f1‘+s) g2.switch() def f2(s): print(‘第一次f2‘+s) g1.switch() time.sleep(1) print(‘第二次f2‘+s) g1 = greenlet(f1) #實例化一個greenlet對象,並將任務名稱作為參數參進去 g2 = greenlet(f2) g1.switch(‘alex‘) #執行g1對象裏面的任務greenlet版的協程
3.gevent版協程(真正的協程)
import gevent import time def f1(): print("第一次f1") gevent.sleep(1) print("第二次f1") def f2(): print("第一次f2") gevent.sleep(2) print("第二次f2") s = time.time() g1 = gevent.spawn(f1) #異步提交了f1任務 g2 = gevent.spawn(f2) #異步提交了f2任務 g1.join() g2.join() e = time.time() print("執行時間:",e-s) print("主程序任務")gevent版協程(不完美版)
大家會發現一個問題就是只能使用gevent.sleep來代替time.sleep。還有就是要g1.join()和g2.join()有些麻煩對不對,下面就是協程gevent版的升級版。
import gevent import time from gevent import monkey;monkey.patch_all() #可以接收所有的I/O def f1(): print("第一次f1") time.sleep(1) print("第二次f1") def f2(): print("第一次f2") time.sleep(2) print("第二次f2") s = time.time() g1 = gevent.spawn(f1) #異步提交了f1任務 g2 = gevent.spawn(f2) #異步提交了f2任務 gevent.joinall([g1,g2]) #一個列表裏面是任務名等同於g1.join()和g2.join() e = time.time() print("執行時間:",e-s) print("主程序任務")gevent版協程(升級版)
線程進階之線程隊列、線程池和協程