Python之旅.第九章.並發編程。
一、異步+回調機制
a、問題引入
問題:
1)任務的返回值不能得到及時的處理,必須等到所有任務都運行完畢才能統一進行處理
2)解析的過程是串行執行的,如果解析一次需要花費2s,解析9次則需要花費18s
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os
import requests
import time
import random
def get(url):
print(‘%s GET %s‘ %(os.getpid(),url))
response=requests.get(url)
time.sleep(random.randint(1,3))
if response.status_code == 200:
return response.text
def pasrse(res):
print(‘%s 解析結果為:%s‘ %(os.getpid(),len(res)))
if __name__ == ‘__main__‘:
urls=[
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.python.org‘,
]
pool=ProcessPoolExecutor(4)
objs=[]
for url in urls:
obj=pool.submit(get,url)
objs.append(obj)
pool.shutdown(wait=True)
for obj in objs:
res=obj.result()
pasrse(res)
b、進階解決方案: 可以解決上述兩個問題,但使得獲取信息函數set和解析信息函數pasrse耦合到了一起
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
import random
def get(url):
print(‘%s GET %s‘ %(os.getpid(),url))
response=requests.get(url)
time.sleep(random.randint(1,3))
if response.status_code == 200:
pasrse(response.text)
def pasrse(res):
print(‘%s 解析結果為:%s‘ %(os.getpid(),len(res)))
if __name__ == ‘__main__‘:
urls=[
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.python.org‘,
]
pool=ProcessPoolExecutor(4)
for url in urls:
pool.submit(get,url)
c1、終極解決方案: 可以解決上述兩個問題,同時使獲取信息函數set和解析信息函數pasrse解耦合(進程版)
主進程作為回調的執行者
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
import random
def get(url):
print(‘%s GET %s‘ %(os.getpid(),url))
response=requests.get(url)
time.sleep(random.randint(1,3))
if response.status_code == 200:
# 幹解析的活
return response.text
def pasrse(obj): #後續回調是obj會將自身傳給pasrse,所以pasrse必須有且僅有一個參數
res=obj.result()
print(‘%s 解析結果為:%s‘ %(os.getpid(),len(res)))
if __name__ == ‘__main__‘:
urls=[
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.python.org‘,
]
pool=ProcessPoolExecutor(4)
for url in urls:
obj=pool.submit(get,url)
obj.add_done_callback(pasrse)
print(‘主進程‘,os.getpid())
c2、終極解決方案: 可以解決上述兩個問題,同時使獲取信息函數set和解析信息函數pasrse解耦合(線程版)
哪個子進程空閑就由那個子進程作為回調的執行者
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import requests
import os
import time
import random
def get(url):
print(‘%s GET %s‘ %(current_thread().name,url))
response=requests.get(url)
time.sleep(random.randint(1,3))
if response.status_code == 200:
# 幹解析的活
return response.text
def pasrse(obj):
res=obj.result()
print(‘%s 解析結果為:%s‘ %(current_thread().name,len(res)))
if __name__ == ‘__main__‘:
urls=[
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.baidu.com‘,
‘https://www.python.org‘,
]
pool=ThreadPoolExecutor(4)
for url in urls:
obj=pool.submit(get,url)
obj.add_done_callback(pasrse)
print(‘主線程‘,current_thread().name)
二、線程queue
import queue
q=queue.Queue(3) #隊列:先進先出
q.put(1)
q.put(2)
q.put(3)
# q.put(4) #阻塞
print(q.get())
print(q.get())
print(q.get())
q=queue.LifoQueue(3) #堆棧:後進先出
q.put(‘a‘)
q.put(‘b‘)
q.put(‘c‘)
print(q.get())
print(q.get())
print(q.get())
q=queue.PriorityQueue(3) #優先級隊列:可以以小元組的形式往隊列裏存值,第一個元素代表優先級,數字越小優先級越高
q.put((10,‘user1‘))
q.put((-3,‘user2‘))
q.put((-2,‘user3‘))
print(q.get())
print(q.get())
print(q.get())
三、線程event
a、案例一: 等待check重置event內的值後,connect從event.wait()後繼續運行
from threading import Event,current_thread,Thread
import time
event=Event() #event內部維護著一個全局變量
def check():
print(‘%s 正在檢測服務是否正常....‘ %current_thread().name)
time.sleep(3)
event.set() #改變event中的全局變量的值
def connect():
print(‘%s 等待連接...‘ %current_thread().name)
event.wait() #等待全局變量的值被重置;如果括號中為1,即只等1秒
print(‘%s 開始連接...‘ % current_thread().name)
if __name__ == ‘__main__‘:
t1=Thread(target=connect)
t2=Thread(target=connect)
t3=Thread(target=connect)
c1=Thread(target=check)
t1.start()
t2.start()
t3.start()
c1.start()
b、案例二:三次刷嘗試後退出
from threading import Event,current_thread,Thread
import time
event=Event()
def check():
print(‘%s 正在檢測服務是否正常....‘ %current_thread().name)
time.sleep(5)
event.set()
def connect():
count=1
while not event.is_set():
if count == 4:
print(‘嘗試的次數過多,請稍後重試‘)
return
print(‘%s 嘗試第%s次連接...‘ %(current_thread().name,count))
event.wait(1)
count+=1
print(‘%s 開始連接...‘ % current_thread().name)
if __name__ == ‘__main__‘:
t1=Thread(target=connect)
t2=Thread(target=connect)
t3=Thread(target=connect)
c1=Thread(target=check)
t1.start()
t2.start()
t3.start()
c1.start()
四、協程
1、單線程下實現並發:協程 (為了提高效率;但不是說所有協程都會提升效率)
並發指的多個任務看起來是同時運行的;並發實現的本質:切換+保存狀態
有效的協程在一定程度‘騙過’了CPU;通過自己內部協調,一遇到IO就切到自己的其他程序中,使得CPU以為這個程序一直在運行,從而使其更有可能處於就緒態或運行態,以更多的占用CPU。
2、實現並發的三種手段:
a)單線程下的並發;由程序自己控制,相對速度快
b)多線程下的並發;由操作系統控制,相對速度較慢
c)多進程下的並發;由操作系統控制,相對速度慢
3、基於yield保存狀態,實現兩個任務直接來回切換,即並發的效果 (但yield不會遇到阻塞自動切程序)
PS:如果每個任務中都加上打印,那麽明顯地看到兩個任務的打印是你一次我一次,即並發執行的.
import time
def consumer():
‘‘‘任務1:接收數據,處理數據‘‘‘
while True:
x=yield
def producer():
‘‘‘任務2:生產數據‘‘‘
g=consumer()
next(g)
for i in range(10000000):
g.send(i)
start=time.time()
producer() #1.0202116966247559
stop=time.time()
print(stop-start)
# 純計算的任務並發執行
import time
def task1():
res=1
for i in range(1000000):
res+=i
yield
time.sleep(10000) #yield不會自動跳過阻塞
print(‘task1‘)
def task2():
g=task1()
res=1
for i in range(1000000):
res*=i
next(g)
print(‘task2‘)
start=time.time()
task2()
stop=time.time()
print(stop-start)
五、單線程下實現遇到IO切換
1、 用greenlet(封裝yield,遇到IO不自動切)
from greenlet import greenlet
import time
def eat(name):
print(‘%s eat 1‘ %name)
time.sleep(30)
g2.switch(‘alex‘) #只在第一次切換時傳值
print(‘%s eat 2‘ %name)
g2.switch()
def play(name):
print(‘%s play 1‘ %name)
g1.switch()
print(‘%s play 2‘ %name)
g1=greenlet(eat)
g2=greenlet(play)
g1.switch(‘egon‘)
2、 用gevent模塊(封裝greenlet,不處理的話,遇到自己的IO才主動切)
import gevent
def eat(name):
print(‘%s eat 1‘ %name)
gevent.sleep(5) #換成time.sleep(5),不會自動切
print(‘%s eat 2‘ %name)
def play(name):
print(‘%s play 1‘ %name)
gevent.sleep(3)
print(‘%s play 2‘ %name)
g1=gevent.spawn(eat,‘egon‘)
g2=gevent.spawn(play,‘alex‘)
# gevent.sleep(100)
# g1.join()
# g2.join()
gevent.joinall([g1,g2])
3、 用gevent模塊(封裝greenlet,處理的話,遇到其他IO也主動切)
from gevent import monkey;monkey.patch_all()
from threading import current_thread
import gevent
import time
def eat():
print(‘%s eat 1‘ %current_thread().name)
time.sleep(5)
print(‘%s eat 2‘ %current_thread().name)
def play():
print(‘%s play 1‘ %current_thread().name)
time.sleep(3)
print(‘%s play 2‘ %current_thread().name)
g1=gevent.spawn(eat)
g2=gevent.spawn(play)
# gevent.sleep(100)
# g1.join()
# g2.join()
print(current_thread().name)
gevent.joinall([g1,g2])
Python之旅.第九章.並發編程。