1. 程式人生 > >並發編程(六)

並發編程(六)

lose 接收數據 append 不可 png coroutine ber fc7 spool

異步調用與回調機制

在之前我們有了解過異步調用機制,當我們在發起異步調用後,並不會等待任務結束才返回,而是直接執行下一行代碼,如果異步功能用狀態來通知,那麽調用者就需要每隔一定時間檢查一次,效率就很低,如果是使用通知的方式,效率則很高,因為異步功能幾乎不需要做額外的操作。至於回調函數,其實和通知沒太多區別。

#舉例:
#1. multiprocessing.Pool().apply_async() #發起異步調用後,並不會等待任務結束才返回,相反,會立即獲取一個臨時結果(並不是最終的結果,可能是封裝好的一個對象)。
#2. concurrent.futures.ProcessPoolExecutor(3).submit(func,)
#3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)

技術分享圖片
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import requests
import os
import time
import random

def get(url):
    print(%s GET %s %(current_thread().name,url))
    response
=requests.get(url) time.sleep(random.randint(1,3)) if response.status_code == 200: # 幹解析的活 return response.text def pasrse(obj): res=obj.result() print(%s 解析結果為:%s %(current_thread().name,len(res))) if __name__ == __main__: urls=[ https://www.baidu.com
, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.python.org, ] pool=ThreadPoolExecutor(4) for url in urls: obj=pool.submit(get,url) obj.add_done_callback(pasrse) print(主線程,current_thread().name)
View Code

線程queue

queue隊列 :使用import queue,用法與進程Queue一樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先進先出

技術分享圖片
import queue

q=queue.Queue()
q.put(first)
q.put(second)
q.put(third)

print(q.get())
print(q.get())
print(q.get())
‘‘‘
結果(先進先出):
first
second
third
‘‘‘
View Code

class queue.LifoQueue(maxsize=0) #last in fisrt out

技術分享圖片
import queue

q=queue.LifoQueue()
q.put(first)
q.put(second)
q.put(third)

print(q.get())
print(q.get())
print(q.get())
‘‘‘
結果(後進先出):
third
second
first
‘‘‘
View Code

class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

技術分享圖片
import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高
q.put((20,a))
q.put((10,b))
q.put((30,c))

print(q.get())
print(q.get())
print(q.get())
‘‘‘
結果(數字越小優先級越高,優先級高的優先出隊):
(10, ‘b‘)
(20, ‘a‘)
(30, ‘c‘)
‘‘‘
View Code

技術分享圖片
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty
Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full
Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()
Queue.empty() #return True if empty  
Queue.full() # return True if full 
Queue.put(item, block=True, timeout=None)
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)
Equivalent to put(item, False).

Queue.get(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()
Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消費完畢
其他

線程Event

同進程的一樣

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標誌被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麽這個線程將會被一直阻塞直至該標誌為真。一個線程如果將一個Event對象的信號標誌設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那麽它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;

event.clear():恢復event的狀態值為False。

技術分享圖片

技術分享圖片
from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError(鏈接超時)
        print(<%s>第%s次嘗試鏈接 % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print(<%s>鏈接成功 %threading.current_thread().getName())


def check_mysql():
    print(\033[45m[%s]正在檢查mysql\033[0m % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == __main__:
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()
View Code

協程

協程:是單線程下的並發,又稱微線程,纖程。英文名Coroutine。一句話說明什麽是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。

協程主要就是以單線程來實現高並發,就是使用一個主線程,可以利用的CPU資源只有一個,而我們所了解的並發的本質就是:切換+保存狀態

cpu正在運行一個任務,會在兩種情況下切走去執行其他的任務(切換由操作系統強制控制),一種情況是該任務發生了阻塞,另外一種情況是該任務計算的時間過長或有一個優先級更高的程序替代了它

其中第二種情況並不能提升效率,只是為了讓cpu能夠雨露均沾,實現看起來所有任務都被“同時”執行的效果,如果多個任務都是純計算的,這種切換反而會降低效率。

我們可以基於yeird來驗證:

#1 yiled可以保存狀態,yield的狀態保存與操作系統的保存線程狀態很像,但是yield是代碼級別控制的,更輕量級
#2 send可以把一個函數的結果傳給另外一個函數,以此實現單線程內程序之間的切換 
技術分享圖片
#串行執行
import time
def consumer(res):
    ‘‘‘任務1:接收數據,處理數據‘‘‘
    pass

def producer():
    ‘‘‘任務2:生產數據‘‘‘
    res=[]
    for i in range(10000000):
        res.append(i)
    return res

start=time.time()
#串行執行
res=producer()
consumer(res) #寫成consumer(producer())會降低執行效率
stop=time.time()
print(stop-start) #1.5536692142486572



#基於yield並發執行
import time
def consumer():
    ‘‘‘任務1:接收數據,處理數據‘‘‘
    while True:
        x=yield

def producer():
    ‘‘‘任務2:生產數據‘‘‘
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)

start=time.time()
#基於yield保存狀態,實現兩個任務直接來回切換,即並發的效果
#PS:如果每個任務中都加上打印,那麽明顯地看到兩個任務的打印是你一次我一次,即並發執行的.
producer()

stop=time.time()
print(stop-start) #2.0272178649902344
單純的切換反而會降低運行效率

第一種情況的切換。在任務一遇到io情況下,切到任務二去執行,這樣就可以利用任務一阻塞的時間完成任務二的計算,效率的提升就在於此。

技術分享圖片
import time
def consumer():
    ‘‘‘任務1:接收數據,處理數據‘‘‘
    while True:
        x=yield

def producer():
    ‘‘‘任務2:生產數據‘‘‘
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)
        time.sleep(2)

start=time.time()
producer() #並發執行,但是任務producer遇到io就會阻塞住,並不會切到該線程內的其他任務去執行

stop=time.time()
print(stop-start)

yield並不能實現遇到io切換
View Code

對於單線程下,我們不可避免程序中出現io操作,但如果我們能在自己的程序中(即用戶程序級別,而非操作系統級別)控制單線程下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該線程能夠最大限度地處於就緒態,即隨時都可以被cpu執行的狀態,相當於我們在用戶程序級別將自己的io操作最大限度地隱藏起來,從而可以迷惑操作系統,讓其看到:該線程好像是一直在計算,io比較少,從而更多的將cpu的執行權限分配給我們的線程。

協程的本質就是在單線程下,由用戶自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。

#1. python的線程屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行)
#2. 單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)

對比操作系統控制線程的切換,用戶在單線程內控制協程的切換

優點如下:

#1. 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級
#2. 單線程內就可以實現並發的效果,最大限度地利用cpu

缺點如下:

#1. 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程
#2. 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程

總結協程特點:

  1. 必須在只有一個單線程裏實現並發
  2. 修改共享數據不需加鎖
  3. 用戶程序裏自己保存多個控制流的上下文棧
  4. 附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制))

在python當中其實並沒有協程這個屬性,而是程序員們默認的將單線程下的並發叫做協程

技術分享圖片
from gevent import monkey;monkey.patch_all()
from threading import current_thread
import gevent
import time

def eat():
    print(%s eat 1 %current_thread().name)
    time.sleep(5)
    print(%s eat 2 %current_thread().name)
def play():
    print(%s play 1 %current_thread().name)
    time.sleep(3)
    print(%s play 2 %current_thread().name)

g1=gevent.spawn(eat)
g2=gevent.spawn(play)

# gevent.sleep(100)
# g1.join()
# g2.join()
print(current_thread().name)
gevent.joinall([g1,g2])
單線程下實現遇IO切換

並發編程(六)