並發編程---線程queue---進程池線程池---異部調用(回調機制)
阿新 • • 發佈:2018-04-24
priority close name port rand current 結果 耦合 join
線程
- 隊列:先進先出
- 堆棧:後進先出
- 優先級:數字越小優先級越大,越先輸出
import queue q = queue.Queue(3) # 先進先出-->隊列 q.put(‘first‘) q.put(2) # q.put(‘third‘) # q.put(4) #由於沒有人取走,就會卡主 q.put(4,block=False) #等同於q.get_nowait(), Ture 阻塞,Flase不阻塞,報異常滿了 # # q.put(4,block=True,timeout=3) print(q.get()) print(q.get()) print(q.get())線程queueprint(q.get(block=True,timeout=3)) # 阻塞等待3秒 沒有取走數據就報異常 # print(q.get(block=False)) #等同於q.get_nowait() # print(q.get_nowait()) q = queue.LifoQueue(3) #後進先出-->堆棧 q.put(‘first‘) q.put(2) q.put(‘third‘) print(q.get()) print(q.get()) print(q.get()) ‘‘‘ 打印結果: third 2 first ‘‘‘ q = queue.PriorityQueue(3) #優先級隊列 q.put((10,‘one‘)) q.put((40,‘two‘)) q.put((30,‘three‘)) print(q.get()) print(q.get()) print(q.get()) ‘‘‘ 數字越小優先級越高 打印結果 (10, ‘one‘) (30, ‘three‘) (40, ‘two‘) ‘‘‘
進程池線程池
- 池:是用來對進程(線程)的數量加以限制
- 進程池:計算密集型,用多進程
- 線程池:IO密集型,用多線程,例如:sockect網絡通信就應該用多線程
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor進程池|線程池import os,time,random ‘‘‘ sockect網絡通信是IO操作,所以用多線程 計算密集型:用多進程 ‘‘‘ def task(name): print(‘name:%s pid:%s run‘ %(name,os.getpid())) time.sleep(random.randint(1,3)) if __name__ == ‘__main__‘: # pool = ProcessPoolExecutor(4) # 進程池最多裝4個進程,不指定的話默認是cpu的核數 pool = ThreadPoolExecutor(5) for i in range(10): pool.submit(task,‘yang%s‘ %i) # 異步調用池子收了10個任務,但同一時間只有4個任務在進行 pool.shutdown(wait=True) # 類似join 代表往池子裏面丟任務的入口關掉 計數器-1 print(‘主‘) ‘‘‘ 打印結果: name:yang0 pid:11120 run name:yang1 pid:11120 run name:yang2 pid:11120 run name:yang3 pid:11120 run name:yang4 pid:11120 run name:yang5 pid:11120 run name:yang6 pid:11120 run name:yang7 pid:11120 run name:yang8 pid:11120 run name:yang9 pid:11120 run 主 ‘‘‘ from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread import os,time,random def task(): print(‘name:%s pid:%s run‘ %(currentThread().getName(),os.getpid())) time.sleep(random.randint(1,3)) if __name__ == ‘__main__‘: # pool = ProcessPoolExecutor(4) # 進程池最多裝4個進程,不指定的話默認是cpu的核數 pool = ThreadPoolExecutor(5) for i in range(10): pool.submit(task) # 異步調用池子收了10個任務,但同一時間只有4個任務在進行 pool.shutdown(wait=True) # 類似join 代表往池子裏面丟任務的入口關掉 計數器-1 print(‘主‘) ‘‘‘ 打印結果: name:ThreadPoolExecutor-0_0 pid:14052 run name:ThreadPoolExecutor-0_1 pid:14052 run name:ThreadPoolExecutor-0_2 pid:14052 run name:ThreadPoolExecutor-0_3 pid:14052 run name:ThreadPoolExecutor-0_4 pid:14052 run name:ThreadPoolExecutor-0_2 pid:14052 run name:ThreadPoolExecutor-0_1 pid:14052 run name:ThreadPoolExecutor-0_3 pid:14052 run name:ThreadPoolExecutor-0_4 pid:14052 run name:ThreadPoolExecutor-0_0 pid:14052 run 主 ‘‘‘
同步調用和異步調用
提交任務的兩種方式:
- 同步調用:提交完任務後,就在原地等待任務執行完畢,拿到結果,再執行下一行代碼,導致程序是串行執行
- 異步調用:提交完任務後,不在原地等待任務執行完。回調機制:自動觸發
#1.同步調用:提交完任務後,就在原地等待任務執行完畢,拿到結果,再執行下一行代碼,導致程序是串行執行 from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print(‘%s is laing‘ %name) time.sleep(random.randint(3,5)) res = random.randint(7,13)*‘#‘ return {‘name‘:name,‘res‘:res} def weigh(shit): name = shit[‘name‘] size = len(shit[‘res‘]) print(‘%s 拉了 <%s>kg‘ %(name,size)) if __name__ == ‘__main__‘: pool = ThreadPoolExecutor(10) shit1 = pool.submit(la,‘alex‘).result() weigh(shit1) shit2 = pool.submit(la,‘yang‘).result() weigh(shit2) shit3 = pool.submit(la,‘hang‘).result() weigh(shit3) ‘‘‘ 打印結果: alex is laing alex 拉了 <8>kg yang is laing yang 拉了 <8>kg hang is laing hang 拉了 <7>kg ‘‘‘同步調用
#2.異步調用:提交完任務後,不在原地等待任務執行完 from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print(‘%s is laing‘ %name) time.sleep(random.randint(3,5)) res = random.randint(7,13)*‘#‘ return {‘name‘:name,‘res‘:res} # weigh({‘name‘:name,‘res‘:res}) # 這樣寫,所有功能 不能體現出解耦合 def weigh(shit): shit = shit.result() # 拿到是一個對象,需要進行result() name = shit[‘name‘] size = len(shit[‘res‘]) print(‘%s 拉了 <%s>kg‘ %(name,size)) if __name__ == ‘__main__‘: pool = ThreadPoolExecutor(10) shit1 = pool.submit(la,‘alex‘).add_done_callback(weigh) shit2 = pool.submit(la,‘yang‘).add_done_callback(weigh) shit3 = pool.submit(la,‘hang‘).add_done_callback(weigh) ‘‘‘ 打印結果: alex is laing yang is laing hang is laing hang 拉了 <10>kg alex 拉了 <7>kg yang 拉了 <12>kg ‘‘‘異步調用
異步調用的應用
from concurrent.futures import ThreadPoolExecutor import requests import time def get(url): print(‘GET %s‘%url) response = requests.get(url) time.sleep(3) return {‘url‘:url,‘content‘:response.text} def parse(res): res = res.result() print(‘%s parse res is %s‘ %(res[‘url‘],len(res[‘content‘]))) if __name__ == ‘__main__‘: urls = [ ‘http://www.cnblogs.com/linhaifeng‘, ‘https://www.python.org‘, ‘https://www.openstack.org‘, ] pool = ThreadPoolExecutor(2) for url in urls: pool.submit(get,url).add_done_callback(parse) ‘‘‘ 打印結果: GET http://www.cnblogs.com/linhaifeng GET https://www.python.org http://www.cnblogs.com/linhaifeng parse res is 16320 GET https://www.openstack.org https://www.python.org parse res is 49273 https://www.openstack.org parse res is 64040 ‘‘‘應用
並發編程---線程queue---進程池線程池---異部調用(回調機制)