非同步、+回撥機制、執行緒queue、執行緒Event、協程、單執行緒實現遇到IO切換
阿新 • • 發佈:2018-11-13
# from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import requests # import os # import time # import random # # def get(url): # print('%s GET %s' %(os.getpid(),url)) # response=requests.get(url) # time.sleep(random.randint(1,3)) # # if response.status_code == 200:非同步+回撥機制# return response.text # # def pasrse(res): # print('%s 解析結果為:%s' %(os.getpid(),len(res))) # # if __name__ == '__main__': # urls=[ # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com',# 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.python.org', # # ] # # pool=ProcessPoolExecutor(4) # objs=[] # for url in urls: # obj=pool.submit(get,url) # objs.append(obj) # # pool.shutdown(wait=True)# # 問題: # # 1、任務的返回值不能得到及時的處理,必須等到所有任務都執行完畢才能統一進行處理 # # 2、解析的過程是序列執行的,如果解析一次需要花費2s,解析9次則需要花費18s # for obj in objs: # res=obj.result() # pasrse(res) # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import requests # import os # import time # import random # # def get(url): # print('%s GET %s' %(os.getpid(),url)) # response=requests.get(url) # time.sleep(random.randint(1,3)) # # if response.status_code == 200: # pasrse(response.text) # # def pasrse(res): # print('%s 解析結果為:%s' %(os.getpid(),len(res))) # # if __name__ == '__main__': # urls=[ # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.python.org', # # ] # # pool=ProcessPoolExecutor(4) # for url in urls: # pool.submit(get,url) # # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import requests # import os # import time # import random # # def get(url): # print('%s GET %s' %(os.getpid(),url)) # response=requests.get(url) # time.sleep(random.randint(1,3)) # # if response.status_code == 200: # # 幹解析的活 # return response.text # # def pasrse(obj): # res=obj.result() # print('%s 解析結果為:%s' %(os.getpid(),len(res))) # # if __name__ == '__main__': # urls=[ # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.python.org', # ] # # pool=ProcessPoolExecutor(4) # for url in urls: # obj=pool.submit(get,url) # obj.add_done_callback(pasrse) # # # 問題: # # 1、任務的返回值不能得到及時的處理,必須等到所有任務都執行完畢才能統一進行處理 # # 2、解析的過程是序列執行的,如果解析一次需要花費2s,解析9次則需要花費18s # print('主程序',os.getpid()) #解決問題: from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import current_thread import requests import os import time import random def get(url): print('%s GET %s' %(current_thread().name,url)) response=requests.get(url) time.sleep(random.randint(1,3)) if response.status_code == 200: # 幹解析的活 return response.text def pasrse(obj): res=obj.result() print('%s 解析結果為:%s' %(current_thread().name,len(res))) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.python.org', ] pool=ThreadPoolExecutor(4) for url in urls: obj=pool.submit(get,url) obj.add_done_callback(pasrse) print('主執行緒',current_thread().name)
執行緒queue:
1、佇列:先進先出
# q=queue.Queue(3) #佇列:先進先出 # q.put(1) # q.put(2) # q.put(3) # # q.put(4) # # print(q.get()) # print(q.get()) # print(q.get())佇列
2、堆疊:後進先出
# q=queue.LifoQueue(3) #堆疊:後進先出 # # q.put('a') # q.put('b') # q.put('c') # # print(q.get()) # print(q.get()) # print(q.get())堆疊
3、優先順序佇列:可以以小元組的形式往佇列理存值,第一個元素代表優先順序,數字越小優先級別越高
q=queue.PriorityQueue(3) q.put((10,'user1')) q.put((-3,'user2')) q.put((-2,'user3')) print(q.get()) print(q.get()) print(q.get())優先順序佇列
Event: 程序之間協同工作
# from threading import Event,current_thread,Thread # import time # # event=Event() # # def check(): # print('%s 正在檢測服務是否正常....' %current_thread().name) # time.sleep(3) # event.set() # # # def connect(): # print('%s 等待連線...' %current_thread().name) # event.wait() # print('%s 開始連線...' % current_thread().name) # # if __name__ == '__main__': # t1=Thread(target=connect) # t2=Thread(target=connect) # t3=Thread(target=connect) # # c1=Thread(target=check) # # t1.start() # t2.start() # t3.start() # c1.start() from threading import Event,current_thread,Thread import time event=Event() def check(): print('%s 正在檢測服務是否正常....' %current_thread().name) time.sleep(5) event.set() def connect(): count=1 while not event.is_set(): if count == 4: print('嘗試的次數過多,請稍後重試') return print('%s 嘗試第%s次連線...' %(current_thread().name,count)) event.wait(1) count+=1 print('%s 開始連線...' % current_thread().name) if __name__ == '__main__': t1=Thread(target=connect) t2=Thread(target=connect) t3=Thread(target=connect) c1=Thread(target=check) t1.start() t2.start() t3.start() c1.start()Event
協程:
1、單執行緒下實現併發:協程
併發指的多個任務看起來是同時執行的
併發實現的本質:切換+儲存狀態
併發、並行、序列:
併發:看起來是同時執行,切換+儲存狀態
並行:真正意義上的同時執行,只有在多cpu的情況下才能
實現並行,4個cpu能夠並行4個任務
序列:一個人完完整整地執行完畢才執行下一個任務
# import time # def consumer(): # '''任務1:接收資料,處理資料''' # while True: # x=yield # # # def producer(): # '''任務2:生產資料''' # g=consumer() # next(g) # for i in range(10000000): # g.send(i) # # start=time.time() # #基於yield儲存狀態,實現兩個任務直接來回切換,即併發的效果 # #PS:如果每個任務中都加上列印,那麼明顯地看到兩個任務的列印是你一次我一次,即併發執行的. # producer() #1.0202116966247559 # # # stop=time.time() # print(stop-start) # # import time # def consumer(res): # '''任務1:接收資料,處理資料''' # pass # # def producer(): # '''任務2:生產資料''' # res=[] # for i in range(10000000): # res.append(i) # # consumer(res) # # return res # # start=time.time() # #序列執行 # res=producer() # stop=time.time() # print(stop-start)協程
單執行緒下實現IO切換:
# from greenlet import greenlet # import time # # def eat(name): # print('%s eat 1' %name) # time.sleep(30) # g2.switch('alex') # print('%s eat 2' %name) # g2.switch() # def play(name): # print('%s play 1' %name) # g1.switch() # print('%s play 2' %name) # # g1=greenlet(eat) # g2=greenlet(play) # # g1.switch('egon') # import gevent # # def eat(name): # print('%s eat 1' %name) # gevent.sleep(5) # print('%s eat 2' %name) # def play(name): # print('%s play 1' %name) # gevent.sleep(3) # print('%s play 2' %name) # # g1=gevent.spawn(eat,'egon') # g2=gevent.spawn(play,'alex') # # # gevent.sleep(100) # # g1.join() # # g2.join() # gevent.joinall([g1,g2]) # from gevent import monkey;monkey.patch_all() # import gevent # import time # # def eat(name): # print('%s eat 1' %name) # time.sleep(5) # print('%s eat 2' %name) # def play(name): # print('%s play 1' %name) # time.sleep(3) # print('%s play 2' %name) # # g1=gevent.spawn(eat,'egon') # g2=gevent.spawn(play,'alex') # # # gevent.sleep(100) # # g1.join() # # g2.join() # gevent.joinall([g1,g2]) from gevent import monkey;monkey.patch_all() from threading import current_thread import gevent import time def eat(): print('%s eat 1' %current_thread().name) time.sleep(5) print('%s eat 2' %current_thread().name) def play(): print('%s play 1' %current_thread().name) time.sleep(3) print('%s play 2' %current_thread().name) g1=gevent.spawn(eat) g2=gevent.spawn(play) # gevent.sleep(100) # g1.join() # g2.join() print(current_thread().name) gevent.joinall([g1,g2])程式碼