1. 程式人生 > >Python之旅.第九章.並發編程。

Python之旅.第九章.並發編程。

要花 解耦合 獲取 ID llb 並發 %s 遇到 問題:

一、異步+回調機制

a、問題引入

問題:

1)任務的返回值不能得到及時的處理,必須等到所有任務都運行完畢才能統一進行處理

2)解析的過程是串行執行的,如果解析一次需要花費2s,解析9次則需要花費18s

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

import os

import requests

import time

import random

def get(url):

print(‘%s GET %s‘ %(os.getpid(),url))

response=requests.get(url)

time.sleep(random.randint(1,3))

if response.status_code == 200:

return response.text

def pasrse(res):

print(‘%s 解析結果為:%s‘ %(os.getpid(),len(res)))

if __name__ == ‘__main__‘:

urls=[

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.python.org‘,

]

pool=ProcessPoolExecutor(4)

objs=[]

for url in urls:

obj=pool.submit(get,url)

objs.append(obj)

pool.shutdown(wait=True)

for obj in objs:

res=obj.result()

pasrse(res)

b、進階解決方案: 可以解決上述兩個問題,但使得獲取信息函數set和解析信息函數pasrse耦合到了一起

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

import requests

import os

import time

import random

def get(url):

print(‘%s GET %s‘ %(os.getpid(),url))

response=requests.get(url)

time.sleep(random.randint(1,3))

if response.status_code == 200:

pasrse(response.text)

def pasrse(res):

print(‘%s 解析結果為:%s‘ %(os.getpid(),len(res)))

if __name__ == ‘__main__‘:

urls=[

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.python.org‘,

]

pool=ProcessPoolExecutor(4)

for url in urls:

pool.submit(get,url)

c1、終極解決方案: 可以解決上述兩個問題,同時使獲取信息函數set和解析信息函數pasrse解耦合(進程版)

主進程作為回調的執行者

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

import requests

import os

import time

import random

def get(url):

print(‘%s GET %s‘ %(os.getpid(),url))

response=requests.get(url)

time.sleep(random.randint(1,3))

if response.status_code == 200:

# 幹解析的活

return response.text

def pasrse(obj): #後續回調是obj會將自身傳給pasrse,所以pasrse必須有且僅有一個參數

res=obj.result()

print(‘%s 解析結果為:%s‘ %(os.getpid(),len(res)))

if __name__ == ‘__main__‘:

urls=[

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.python.org‘,

]

pool=ProcessPoolExecutor(4)

for url in urls:

obj=pool.submit(get,url)

obj.add_done_callback(pasrse)

print(‘主進程‘,os.getpid())

c2、終極解決方案: 可以解決上述兩個問題,同時使獲取信息函數set和解析信息函數pasrse解耦合(線程版)

哪個子進程空閑就由那個子進程作為回調的執行者

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

from threading import current_thread

import requests

import os

import time

import random

def get(url):

print(‘%s GET %s‘ %(current_thread().name,url))

response=requests.get(url)

time.sleep(random.randint(1,3))

if response.status_code == 200:

# 幹解析的活

return response.text

def pasrse(obj):

res=obj.result()

print(‘%s 解析結果為:%s‘ %(current_thread().name,len(res)))

if __name__ == ‘__main__‘:

urls=[

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.baidu.com‘,

‘https://www.python.org‘,

]

pool=ThreadPoolExecutor(4)

for url in urls:

obj=pool.submit(get,url)

obj.add_done_callback(pasrse)

print(‘主線程‘,current_thread().name)

二、線程queue

import queue

q=queue.Queue(3) #隊列:先進先出

q.put(1)

q.put(2)

q.put(3)

# q.put(4) #阻塞

print(q.get())

print(q.get())

print(q.get())

q=queue.LifoQueue(3) #堆棧:後進先出

q.put(‘a‘)

q.put(‘b‘)

q.put(‘c‘)

print(q.get())

print(q.get())

print(q.get())

q=queue.PriorityQueue(3) #優先級隊列:可以以小元組的形式往隊列裏存值,第一個元素代表優先級,數字越小優先級越高

q.put((10,‘user1‘))

q.put((-3,‘user2‘))

q.put((-2,‘user3‘))

print(q.get())

print(q.get())

print(q.get())

三、線程event

a、案例一: 等待check重置event內的值後,connectevent.wait()後繼續運行

from threading import Event,current_thread,Thread

import time

event=Event() #event內部維護著一個全局變量

def check():

print(‘%s 正在檢測服務是否正常....‘ %current_thread().name)

time.sleep(3)

event.set() #改變event中的全局變量的值

def connect():

print(‘%s 等待連接...‘ %current_thread().name)

event.wait() #等待全局變量的值被重置;如果括號中為1,即只等1

print(‘%s 開始連接...‘ % current_thread().name)

if __name__ == ‘__main__‘:

t1=Thread(target=connect)

t2=Thread(target=connect)

t3=Thread(target=connect)

c1=Thread(target=check)

t1.start()

t2.start()

t3.start()

c1.start()

b、案例二:三次刷嘗試後退出

from threading import Event,current_thread,Thread

import time

event=Event()

def check():

print(‘%s 正在檢測服務是否正常....‘ %current_thread().name)

time.sleep(5)

event.set()

def connect():

count=1

while not event.is_set():

if count == 4:

print(‘嘗試的次數過多,請稍後重試‘)

return

print(‘%s 嘗試第%s次連接...‘ %(current_thread().name,count))

event.wait(1)

count+=1

print(‘%s 開始連接...‘ % current_thread().name)

if __name__ == ‘__main__‘:

t1=Thread(target=connect)

t2=Thread(target=connect)

t3=Thread(target=connect)

c1=Thread(target=check)

t1.start()

t2.start()

t3.start()

c1.start()

四、協程

1、單線程下實現並發:協程 (為了提高效率;但不是說所有協程都會提升效率)

並發指的多個任務看起來是同時運行的;並發實現的本質:切換+保存狀態

有效的協程在一定程度騙過CPU;通過自己內部協調,一遇到IO就切到自己的其他程序中,使得CPU以為這個程序一直在運行,從而使其更有可能處於就緒態或運行態,以更多的占用CPU

2、實現並發的三種手段:

a)單線程下的並發;由程序自己控制,相對速度快

b)多線程下的並發;由操作系統控制,相對速度較慢

c)多進程下的並發;由操作系統控制,相對速度慢

3、基於yield保存狀態,實現兩個任務直接來回切換,即並發的效果 (但yield不會遇到阻塞自動切程序)

PS:如果每個任務中都加上打印,那麽明顯地看到兩個任務的打印是你一次我一次,即並發執行的.

import time

def consumer():

‘‘‘任務1:接收數據,處理數據‘‘‘

while True:

x=yield

def producer():

‘‘‘任務2:生產數據‘‘‘

g=consumer()

next(g)

for i in range(10000000):

g.send(i)

start=time.time()

producer() #1.0202116966247559

stop=time.time()

print(stop-start)

# 純計算的任務並發執行

import time

def task1():

res=1

for i in range(1000000):

res+=i

yield

time.sleep(10000) #yield不會自動跳過阻塞

print(‘task1‘)

def task2():

g=task1()

res=1

for i in range(1000000):

res*=i

next(g)

print(‘task2‘)

start=time.time()

task2()

stop=time.time()

print(stop-start)

五、單線程下實現遇到IO切換

1 greenlet(封裝yield,遇到IO不自動切)

from greenlet import greenlet

import time

def eat(name):

print(‘%s eat 1‘ %name)

time.sleep(30)

g2.switch(‘alex‘) #只在第一次切換時傳值

print(‘%s eat 2‘ %name)

g2.switch()

def play(name):

print(‘%s play 1‘ %name)

g1.switch()

print(‘%s play 2‘ %name)

g1=greenlet(eat)

g2=greenlet(play)

g1.switch(‘egon‘)

2 gevent模塊(封裝greenlet,不處理的話,遇到自己的IO才主動切)

import gevent

def eat(name):

print(‘%s eat 1‘ %name)

gevent.sleep(5) #換成time.sleep(5),不會自動切

print(‘%s eat 2‘ %name)

def play(name):

print(‘%s play 1‘ %name)

gevent.sleep(3)

print(‘%s play 2‘ %name)

g1=gevent.spawn(eat,‘egon‘)

g2=gevent.spawn(play,‘alex‘)

# gevent.sleep(100)

# g1.join()

# g2.join()

gevent.joinall([g1,g2])

3 gevent模塊(封裝greenlet,處理的話,遇到其他IO也主動切)

from gevent import monkey;monkey.patch_all()

from threading import current_thread

import gevent

import time

def eat():

print(‘%s eat 1‘ %current_thread().name)

time.sleep(5)

print(‘%s eat 2‘ %current_thread().name)

def play():

print(‘%s play 1‘ %current_thread().name)

time.sleep(3)

print(‘%s play 2‘ %current_thread().name)

g1=gevent.spawn(eat)

g2=gevent.spawn(play)

# gevent.sleep(100)

# g1.join()

# g2.join()

print(current_thread().name)

gevent.joinall([g1,g2])

Python之旅.第九章.並發編程。