1. 程式人生 > >非同步、+回撥機制、執行緒queue、執行緒Event、協程、單執行緒實現遇到IO切換

非同步、+回撥機制、執行緒queue、執行緒Event、協程、單執行緒實現遇到IO切換

# from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
# import requests
# import os
# import time
# import random
#
# def get(url):
#     print('%s GET %s' %(os.getpid(),url))
#     response=requests.get(url)
#     time.sleep(random.randint(1,3))
#
#     if response.status_code == 200:
# return response.text # # def pasrse(res): # print('%s 解析結果為:%s' %(os.getpid(),len(res))) # # if __name__ == '__main__': # urls=[ # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com',
# 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.python.org', # # ] # # pool=ProcessPoolExecutor(4) # objs=[] # for url in urls: # obj=pool.submit(get,url) # objs.append(obj) # # pool.shutdown(wait=True)
# # 問題: # # 1、任務的返回值不能得到及時的處理,必須等到所有任務都執行完畢才能統一進行處理 # # 2、解析的過程是序列執行的,如果解析一次需要花費2s,解析9次則需要花費18s # for obj in objs: # res=obj.result() # pasrse(res) # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import requests # import os # import time # import random # # def get(url): # print('%s GET %s' %(os.getpid(),url)) # response=requests.get(url) # time.sleep(random.randint(1,3)) # # if response.status_code == 200: # pasrse(response.text) # # def pasrse(res): # print('%s 解析結果為:%s' %(os.getpid(),len(res))) # # if __name__ == '__main__': # urls=[ # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.python.org', # # ] # # pool=ProcessPoolExecutor(4) # for url in urls: # pool.submit(get,url) # # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import requests # import os # import time # import random # # def get(url): # print('%s GET %s' %(os.getpid(),url)) # response=requests.get(url) # time.sleep(random.randint(1,3)) # # if response.status_code == 200: # # 幹解析的活 # return response.text # # def pasrse(obj): # res=obj.result() # print('%s 解析結果為:%s' %(os.getpid(),len(res))) # # if __name__ == '__main__': # urls=[ # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.baidu.com', # 'https://www.python.org', # ] # # pool=ProcessPoolExecutor(4) # for url in urls: # obj=pool.submit(get,url) # obj.add_done_callback(pasrse) # # # 問題: # # 1、任務的返回值不能得到及時的處理,必須等到所有任務都執行完畢才能統一進行處理 # # 2、解析的過程是序列執行的,如果解析一次需要花費2s,解析9次則需要花費18s # print('主程序',os.getpid()) #解決問題: from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import current_thread import requests import os import time import random def get(url): print('%s GET %s' %(current_thread().name,url)) response=requests.get(url) time.sleep(random.randint(1,3)) if response.status_code == 200: # 幹解析的活 return response.text def pasrse(obj): res=obj.result() print('%s 解析結果為:%s' %(current_thread().name,len(res))) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.python.org', ] pool=ThreadPoolExecutor(4) for url in urls: obj=pool.submit(get,url) obj.add_done_callback(pasrse) print('主執行緒',current_thread().name)
非同步+回撥機制

 

執行緒queue:

1、佇列:先進先出

# q=queue.Queue(3) #佇列:先進先出
# q.put(1)
# q.put(2)
# q.put(3)
# # q.put(4)
#
# print(q.get())
# print(q.get())
# print(q.get())
佇列

 

2、堆疊:後進先出

# q=queue.LifoQueue(3) #堆疊:後進先出
#
# q.put('a')
# q.put('b')
# q.put('c')
#
# print(q.get())
# print(q.get())
# print(q.get())
堆疊

 

3、優先順序佇列:可以以小元組的形式往佇列理存值,第一個元素代表優先順序,數字越小優先級別越高

q=queue.PriorityQueue(3)
q.put((10,'user1'))
q.put((-3,'user2'))
q.put((-2,'user3'))


print(q.get())
print(q.get())
print(q.get())
優先順序佇列

 

Event: 程序之間協同工作

# from threading import Event,current_thread,Thread
# import time
#
# event=Event()
#
# def check():
#     print('%s 正在檢測服務是否正常....' %current_thread().name)
#     time.sleep(3)
#     event.set()
#
#
# def connect():
#     print('%s 等待連線...' %current_thread().name)
#     event.wait()
#     print('%s 開始連線...' % current_thread().name)
#
# if __name__ == '__main__':
#     t1=Thread(target=connect)
#     t2=Thread(target=connect)
#     t3=Thread(target=connect)
#
#     c1=Thread(target=check)
#
#     t1.start()
#     t2.start()
#     t3.start()
#     c1.start()




from threading import Event,current_thread,Thread
import time

event=Event()

def check():
    print('%s 正在檢測服務是否正常....' %current_thread().name)
    time.sleep(5)
    event.set()


def connect():
    count=1
    while not event.is_set():
        if count ==  4:
            print('嘗試的次數過多,請稍後重試')
            return
        print('%s 嘗試第%s次連線...' %(current_thread().name,count))
        event.wait(1)
        count+=1
    print('%s 開始連線...' % current_thread().name)

if __name__ == '__main__':
    t1=Thread(target=connect)
    t2=Thread(target=connect)
    t3=Thread(target=connect)

    c1=Thread(target=check)

    t1.start()
    t2.start()
    t3.start()
    c1.start()
Event

 

 

 

協程:

1、單執行緒下實現併發:協程

    併發指的多個任務看起來是同時執行的

    併發實現的本質:切換+儲存狀態

    併發、並行、序列:

    併發:看起來是同時執行,切換+儲存狀態

    並行:真正意義上的同時執行,只有在多cpu的情況下才能

      實現並行,4個cpu能夠並行4個任務

    序列:一個人完完整整地執行完畢才執行下一個任務

# import time
# def consumer():
#     '''任務1:接收資料,處理資料'''
#     while True:
#         x=yield
#
#
# def producer():
#     '''任務2:生產資料'''
#     g=consumer()
#     next(g)
#     for i in range(10000000):
#         g.send(i)
#
# start=time.time()
# #基於yield儲存狀態,實現兩個任務直接來回切換,即併發的效果
# #PS:如果每個任務中都加上列印,那麼明顯地看到兩個任務的列印是你一次我一次,即併發執行的.
# producer() #1.0202116966247559
#
#
# stop=time.time()
# print(stop-start)




#
# import time
# def consumer(res):
#     '''任務1:接收資料,處理資料'''
#     pass
#
# def producer():
#     '''任務2:生產資料'''
#     res=[]
#     for i in range(10000000):
#         res.append(i)
#
#     consumer(res)
#     # return res
#
# start=time.time()
# #序列執行
# res=producer()
# stop=time.time()
# print(stop-start)
協程

 

單執行緒下實現IO切換:

# from greenlet import greenlet
# import time
#
# def eat(name):
#     print('%s eat 1' %name)
#     time.sleep(30)
#     g2.switch('alex')
#     print('%s eat 2' %name)
#     g2.switch()
# def play(name):
#     print('%s play 1' %name)
#     g1.switch()
#     print('%s play 2' %name)
#
# g1=greenlet(eat)
# g2=greenlet(play)
#
# g1.switch('egon')


# import gevent
#
# def eat(name):
#     print('%s eat 1' %name)
#     gevent.sleep(5)
#     print('%s eat 2' %name)
# def play(name):
#     print('%s play 1' %name)
#     gevent.sleep(3)
#     print('%s play 2' %name)
#
# g1=gevent.spawn(eat,'egon')
# g2=gevent.spawn(play,'alex')
#
# # gevent.sleep(100)
# # g1.join()
# # g2.join()
# gevent.joinall([g1,g2])



# from gevent import monkey;monkey.patch_all()
# import gevent
# import time
#
# def eat(name):
#     print('%s eat 1' %name)
#     time.sleep(5)
#     print('%s eat 2' %name)
# def play(name):
#     print('%s play 1' %name)
#     time.sleep(3)
#     print('%s play 2' %name)
#
# g1=gevent.spawn(eat,'egon')
# g2=gevent.spawn(play,'alex')
#
# # gevent.sleep(100)
# # g1.join()
# # g2.join()
# gevent.joinall([g1,g2])






from gevent import monkey;monkey.patch_all()
from threading import current_thread
import gevent
import time

def eat():
    print('%s eat 1' %current_thread().name)
    time.sleep(5)
    print('%s eat 2' %current_thread().name)
def play():
    print('%s play 1' %current_thread().name)
    time.sleep(3)
    print('%s play 2' %current_thread().name)

g1=gevent.spawn(eat)
g2=gevent.spawn(play)

# gevent.sleep(100)
# g1.join()
# g2.join()
print(current_thread().name)
gevent.joinall([g1,g2])
程式碼