4月28日 python學習總結 線程與協程
阿新 • • 發佈:2018-05-06
回調 obj eat dom print 返回 ces cal 保存
一、 異步與回調機制
問題:
1、任務的返回值不能得到及時的處理,必須等到所有任務都運行完畢才能統一進行處理
2、解析的過程是串行執行的,如果解析一次需要花費2s,解析9次則需要花費18s
解決一: (線程實現異步,回調解析結果)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import current_thread import requests import os import time import random def get(url):print(‘%s GET %s‘ %(current_thread().name,url)) response=requests.get(url) time.sleep(random.randint(1,3)) if response.status_code == 200: # 幹解析的活 return response.text def pasrse(obj): res=obj.result() print(‘%s 解析結果為:%s‘ %(current_thread().name,len(res))) if __name__== ‘__main__‘: urls=[ ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.python.org‘, ] pool=ThreadPoolExecutor(4) for url in urls: obj=pool.submit(get,url) #放入進程池,實現異步操作 obj.add_done_callback(pasrse) #回調,將線程執行結果當作參數傳遞給pasrse函數,線程是誰先空閑誰執行結果處理,不存在主次之分 print(‘主線程‘,current_thread().name)
解決二: (進程實現異步,回調解析結果)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import requests import os import time import random def get(url): print(‘%s GET %s‘ %(os.getpid(),url)) response=requests.get(url) time.sleep(random.randint(1,3)) if response.status_code == 200: # 幹解析的活 return response.text def pasrse(obj): res=obj.result() print(‘%s 解析結果為:%s‘ %(os.getpid(),len(res))) if __name__ == ‘__main__‘: urls=[ ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.python.org‘, ] pool=ProcessPoolExecutor(4) for url in urls: obj=pool.submit(get,url) #放入進程池,實現異步操作 obj.add_done_callback(pasrse) #回調,將進程執行結果當作參數傳遞給pasrse函數,由主進程執行 print(‘主進程‘,os.getpid())
二、線程queue
import queue q=queue.Queue(3) #隊列:先進先出 q.put(1) q.put(2) q.put(3) # q.put(4) print(q.get()) print(q.get()) print(q.get()) q=queue.LifoQueue(3) #堆棧:後進先出 q.put(‘a‘) q.put(‘b‘) q.put(‘c‘) print(q.get()) print(q.get()) print(q.get()) q=queue.PriorityQueue(3) #優先級隊列:可以以小元組的形式往隊列裏存值,第一個元素代表優先級,數字越小優先級越高 q.put((10,‘user1‘)) q.put((-3,‘user2‘)) q.put((-2,‘user3‘)) print(q.get()) print(q.get()) print(q.get())
三、線程Event
from threading import Event,current_thread,Thread import time event=Event() # 監聽信號 初始值為False def check(): print(‘%s 正在檢測服務是否正常....‘ %current_thread().name) time.sleep(5) event.set() #set 方法將信號值 置為True def connect(): count=1 while not event.is_set(): #判斷標記為是否為True if count == 4: print(‘嘗試的次數過多,請稍後重試‘) return print(‘%s 嘗試第%s次連接...‘ %(current_thread().name,count)) event.wait(1) #括號裏的是等待時間,程序想繼續運行,除非標誌位為True或者超時,此處超時不會報錯,是繼續執行 count+=1 print(‘%s 開始連接...‘ % current_thread().name) if __name__ == ‘__main__‘: t1=Thread(target=connect) t2=Thread(target=connect) t3=Thread(target=connect) c1=Thread(target=check) t1.start() t2.start() t3.start() c1.start()
四、協程
1、單線程下實現並發:協程
並發指的多個任務看起來是同時運行的
並發實現的本質:切換+保存狀態
2、並發、並行、串行:
並發:看起來是同時運行,切換+保存狀態
並行:真正意義上的同時運行,只有在多cpu的情況下才能
實現並行,4個cpu能夠並行4個任務
串行:一個人完完整整地執行完畢才運行下一個任務
import time def consumer(): ‘‘‘任務1:接收數據,處理數據‘‘‘ while True: x=yield def producer(): ‘‘‘任務2:生產數據‘‘‘ g=consumer() next(g) for i in range(10000000): g.send(i) start=time.time() #基於yield保存狀態,實現兩個任務直接來回切換,即並發的效果 #PS:如果每個任務中都加上打印,那麽明顯地看到兩個任務的打印是你一次我一次,即並發執行的. producer() #1.0202116966247559 stop=time.time() print(stop-start)
並不是所有協程都能提升效率,如果是IO密集型的,協程會提高執行效率,然而計算密集型的切換並不能提高效率,反而會降低效率
五、單線程下實現遇到IO切換
1、greentlet可以切換,但不能遇到IO切
from greenlet import greenlet import time def eat(name): print(‘%s eat 1‘ %name) time.sleep(30) g2.switch(‘alex‘) #遇到switch切換 print(‘%s eat 2‘ %name) g2.switch() def play(name): print(‘%s play 1‘ %name) g1.switch() print(‘%s play 2‘ %name) g1=greenlet(eat) g2=greenlet(play) g1.switch(‘egon‘)
·
2、gevent切換,只能識別自己的IO操作,無法數別系統定義的IO,如time.sleep()
import gevent def eat(name): print(‘%s eat 1‘ %name) gevent.sleep(5) #gevent自定義的IO 可切換 print(‘%s eat 2‘ %name) def play(name): print(‘%s play 1‘ %name) gevent.sleep(3) print(‘%s play 2‘ %name) g1=gevent.spawn(eat,‘egon‘) g2=gevent.spawn(play,‘alex‘) # g1.join() # g2.join() gevent.joinall([g1,g2]) #無法識別,不能切換 from gevent import monkey;monkey.patch_all() import gevent import time def eat(name): print(‘%s eat 1‘ %name) time.sleep(5) #無法識別,不能切換 print(‘%s eat 2‘ %name) def play(name): print(‘%s play 1‘ %name) time.sleep(3) print(‘%s play 2‘ %name) g1=gevent.spawn(eat,‘egon‘) g2=gevent.spawn(play,‘alex‘) # g1.join() # g2.join() gevent.joinall([g1,g2])
3、若想要實現系統定義的IO切換需加上
import monkey;monkey.patch_all()
eg:
from gevent import monkey;monkey.patch_all() from threading import current_thread import gevent import time def eat(): print(‘%s eat 1‘ %current_thread().name) time.sleep(5) print(‘%s eat 2‘ %current_thread().name) def play(): print(‘%s play 1‘ %current_thread().name) time.sleep(3) print(‘%s play 2‘ %current_thread().name) g1=gevent.spawn(eat) g2=gevent.spawn(play) # gevent.sleep(100) # g1.join() # g2.join() print(current_thread().name) gevent.joinall([g1,g2])
4月28日 python學習總結 線程與協程