1. 程式人生 > >4月28日 python學習總結 線程與協程

4月28日 python學習總結 線程與協程

回調 obj eat dom print 返回 ces cal 保存

一、 異步與回調機制  

問題:

1、任務的返回值不能得到及時的處理,必須等到所有任務都運行完畢才能統一進行處理

2、解析的過程是串行執行的,如果解析一次需要花費2s,解析9次則需要花費18s

解決一: (線程實現異步,回調解析結果)    

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import requests
import os
import time
import random

def get(url):
    
print(%s GET %s %(current_thread().name,url)) response=requests.get(url) time.sleep(random.randint(1,3)) if response.status_code == 200: # 幹解析的活 return response.text def pasrse(obj): res=obj.result() print(%s 解析結果為:%s %(current_thread().name,len(res))) if __name__
== __main__: urls=[ https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.python.org
, ] pool=ThreadPoolExecutor(4) for url in urls: obj=pool.submit(get,url) #放入進程池,實現異步操作 obj.add_done_callback(pasrse) #回調,將線程執行結果當作參數傳遞給pasrse函數,線程是誰先空閑誰執行結果處理,不存在主次之分 print(主線程,current_thread().name)

   

     解決二: (進程實現異步,回調解析結果)

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
import random

def get(url):
    print(%s GET %s %(os.getpid(),url))
    response=requests.get(url)
    time.sleep(random.randint(1,3))

    if response.status_code == 200:
        # 幹解析的活
        return response.text

def pasrse(obj):
    res=obj.result()
    print(%s 解析結果為:%s %(os.getpid(),len(res)))

if __name__ == __main__:
    urls=[
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.python.org,
    ]

    pool=ProcessPoolExecutor(4)
    for url in urls:
        obj=pool.submit(get,url)           #放入進程池,實現異步操作
        obj.add_done_callback(pasrse)      #回調,將進程執行結果當作參數傳遞給pasrse函數,由主進程執行

    print(主進程,os.getpid())

二、線程queue

import queue

q=queue.Queue(3) #隊列:先進先出
q.put(1)
q.put(2)
q.put(3)
# q.put(4)

print(q.get())
print(q.get())
print(q.get())

q=queue.LifoQueue(3) #堆棧:後進先出

q.put(a)
q.put(b)
q.put(c)

print(q.get())
print(q.get())
print(q.get())

q=queue.PriorityQueue(3) #優先級隊列:可以以小元組的形式往隊列裏存值,第一個元素代表優先級,數字越小優先級越高
q.put((10,user1))
q.put((-3,user2))
q.put((-2,user3))

print(q.get())
print(q.get())
print(q.get())

三、線程Event   

from threading import Event,current_thread,Thread
import time

event=Event()      # 監聽信號 初始值為False

def check():
    print(%s 正在檢測服務是否正常.... %current_thread().name)
    time.sleep(5)
    event.set()         #set 方法將信號值 置為True


def connect():
    count=1
    while not event.is_set():        #判斷標記為是否為True
        if count ==  4:
            print(嘗試的次數過多,請稍後重試)
            return
        print(%s 嘗試第%s次連接... %(current_thread().name,count))
        event.wait(1)             #括號裏的是等待時間,程序想繼續運行,除非標誌位為True或者超時,此處超時不會報錯,是繼續執行
        count+=1
    print(%s 開始連接... % current_thread().name)

if __name__ == __main__:
    t1=Thread(target=connect)
    t2=Thread(target=connect)
    t3=Thread(target=connect)

    c1=Thread(target=check)

    t1.start()
    t2.start()
    t3.start()
    c1.start()

四、協程    

1、單線程下實現並發:協程

並發指的多個任務看起來是同時運行的

並發實現的本質:切換+保存狀態

2、並發、並行、串行:

並發:看起來是同時運行,切換+保存狀態
並行:真正意義上的同時運行,只有在多cpu的情況下才能
實現並行,4個cpu能夠並行4個任務

串行:一個人完完整整地執行完畢才運行下一個任務

import time
def consumer():
     ‘‘‘任務1:接收數據,處理數據‘‘‘
     while True:
         x=yield
 
 
def producer():
     ‘‘‘任務2:生產數據‘‘‘
     g=consumer()
     next(g)
     for i in range(10000000):
        g.send(i)

 start=time.time()
 #基於yield保存狀態,實現兩個任務直接來回切換,即並發的效果
 #PS:如果每個任務中都加上打印,那麽明顯地看到兩個任務的打印是你一次我一次,即並發執行的.
 producer() #1.0202116966247559

 stop=time.time()
 print(stop-start)

    並不是所有協程都能提升效率,如果是IO密集型的,協程會提高執行效率,然而計算密集型的切換並不能提高效率,反而會降低效率

五、單線程下實現遇到IO切換

    1、greentlet可以切換,但不能遇到IO切  

from greenlet import greenlet
import time

def eat(name):
    print(%s eat 1 %name)
    time.sleep(30)
    g2.switch(alex)        #遇到switch切換
    print(%s eat 2 %name)
    g2.switch()
def play(name):
    print(%s play 1 %name)
    g1.switch()
    print(%s play 2 %name)

g1=greenlet(eat)
g2=greenlet(play)

g1.switch(egon)

·    

    2、gevent切換,只能識別自己的IO操作,無法數別系統定義的IO,如time.sleep()

import gevent

def eat(name):
    print(%s eat 1 %name)
    gevent.sleep(5)        #gevent自定義的IO  可切換
    print(%s eat 2 %name)
def play(name):  
    print(%s play 1 %name)
    gevent.sleep(3)
    print(%s play 2 %name)

g1=gevent.spawn(eat,egon)
g2=gevent.spawn(play,alex)

# g1.join()
# g2.join()
gevent.joinall([g1,g2])



#無法識別,不能切換
from gevent import monkey;monkey.patch_all()
import gevent
import time

def eat(name):
    print(%s eat 1 %name)
    time.sleep(5)         #無法識別,不能切換
    print(%s eat 2 %name)
def play(name):
    print(%s play 1 %name)
    time.sleep(3)
    print(%s play 2 %name)

g1=gevent.spawn(eat,egon)
g2=gevent.spawn(play,alex)

# g1.join()
# g2.join()
gevent.joinall([g1,g2])

3、若想要實現系統定義的IO切換需加上       

import monkey;monkey.patch_all()

eg:

from gevent import monkey;monkey.patch_all()
from threading import current_thread
import gevent
import time

def eat():
    print(%s eat 1 %current_thread().name)
    time.sleep(5)
    print(%s eat 2 %current_thread().name)
def play():
    print(%s play 1 %current_thread().name)
    time.sleep(3)
    print(%s play 2 %current_thread().name)

g1=gevent.spawn(eat)
g2=gevent.spawn(play)

# gevent.sleep(100)
# g1.join()
# g2.join()
print(current_thread().name)
gevent.joinall([g1,g2])

4月28日 python學習總結 線程與協程