Python基礎34_執行緒-條件,定時器,佇列,執行緒池, 協程
執行緒
一. 條件
使得執行緒等待,只有滿足某條件時,才釋放n個執行緒
import time
from threading import Thread,RLock,Condition,current_thread
def func1(c):
c.acquire(False) #固定格式
# print(1111)
c.wait() #等待通知,
time.sleep(3) #通知完成後大家是序列執行的,這也看出了鎖的機制了
print('%s執行了'%(current_thread().getName()))
c.release()
if __name__ == '__main__':
c = Condition()
for i in range(5):
t = Thread(target=func1,args=(c,))
t.start()
while True:
num = int(input('請輸入你要通知的執行緒個數:'))
c.acquire() #固定格式
c.notify(num) #通知num個執行緒別等待了,去執行吧
c.release()
#結果分析:
# 請輸入你要通知的執行緒個數:3
# 請輸入你要通知的執行緒個數:Thread-1執行了 #有時候你會發現的你結果列印在了你要輸入內容的地方,這是列印的問題,沒關係,不影響
# Thread-3執行了
# Thread-2執行了
二. 定時器
定時器,指定n秒後執行某個操作,這個做定時任務的時候可能會用到
import time
from threading import Timer,current_thread #這裡就不需要再引入Timer
import threading
def hello():
print(current_thread().getName())
print("hello, world")
# time.sleep(3) #如果你的子執行緒的程式執行時間比較長,那麼這個定時任務也會亂,當然了,主要還是看業務需求
t = Timer(10, hello) #建立一個子執行緒去執行後面的函式
t.start() # after 1 seconds, "hello, world" will be printed
# for i in range(5):
# t = Timer(2, hello)
# t.start()
# time.sleep(3) #這個是建立一個t用的時間是2秒,創建出來第二個的時候,第一個已經過了兩秒了,所以你的5個t的執行結果基本上就是2秒中,這個延遲操作。
print(threading.active_count())
print('主程序',current_thread().getName())
三. 執行緒佇列
queue佇列 :使用import queue,用法與程序Queue一樣
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
1. class queue.Queue(maxsize=0) 先進先出
import queue
q = queue.Queue(3)
q.put(1)
q.put(2)
print("當前佇列長度: ", q.qsize())
q.put(3)
print("檢視佇列狀態: ", q.full())
try:
q.put_nowait(4)
except Exception:
print("當前佇列已滿")
print(q.get())
print(q.get())
print(q.get())
print("檢視佇列狀態: ", q.empty())
try:
q.get_nowait()
except Exception:
print("佇列已空")
2. class queue.LifoQueue(maxsize=0) 後進先出
q = queue.LifoQueue(3)
q.put(1)
q.put(2)
q.put(3)
print("檢視佇列狀態: " ,q.full())
print(q.get())
print(q.get())
print("檢視當前佇列長度: ", q.qsize())
print(q.get())
3. class queue.PriorityQueue(maxsize=0) 優先順序佇列
def f1():
pass
class Animal:
pass
a = Animal()
q = queue.PriorityQueue(3)
q.put((1, Animal))
q.put((3, f1))
q.put((5, a))
print(q.get())
print(q.get())
print(q.get())
這三種佇列都是執行緒安全的,不會出現多個執行緒搶佔同一個資源或資料的情況。
四. 執行緒池 concurrent.futures 模組
早期的時候我們沒有執行緒池,現在python提供了一個新的標準或者說內建的模組,這個模組裡面提供了新的執行緒池和程序池,之前我們說的程序池是在multiprocessing裡面的,現在這個在這個新的模組裡面,他倆用法上是一樣的。
為什麼要將程序池和執行緒池放到一起呢,是為了統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一樣,而且只要通過這個concurrent.futures匯入就可以直接用他們兩個了:
concurrent.futures模組提供了高度封裝的非同步呼叫介面
ThreadPoolExecutor:執行緒池,提供非同步呼叫
ProcessPoolExecutor: 程序池,提供非同步呼叫
Both implement the same interface, which is defined by the abstract Executor class.
1. 基本方法
#submit(fn, *args, **kwargs)
非同步提交任務
#map(func, *iterables, timeout=None, chunksize=1)
取代for迴圈submit的操作
#shutdown(wait=True)
相當於程序池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源後才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait引數為何值,整個程式都會等到所有任務執行完畢
submit和map必須在shutdown之前
#result(timeout=None)
取得結果
#add_done_callback(fn)
回撥函式
2. 執行緒池的簡單使用
import time, os, threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def func(n):
time.sleep(1)
print("%s列印的: %s"%(threading.get_ident(), n))
return n*n
# 預設一般起執行緒個數不超過CPU個數*5
tpool = ThreadPoolExecutor(max_workers=5)
# 非同步執行
t_list = []
for i in range(5):
# 提交執行函式,返回一個結果物件,i作為任務函式的引數 def submit(self, fn, *args, **kwargs):可以傳任意形式的引數
t = tpool.submit(func, i)
t_list.append(t)
# print(t.result())
# 這個返回的結果物件t,不能直接去拿結果,不然又變成串行了,可以理解為拿到一個號碼,等所有執行緒的結果都出來之後,我們再去通過結果物件t獲取結果
tpool.shutdown()
# 起到原來的close阻止新任務進來 + join的作用,等待所有的執行緒執行完畢
print("主執行緒")
for ti in t_list:
print(">>>", ti.result())
# 我們還可以不用shutdown(),用下面這種方式
# while 1:
# for n,ti in enumerate(t_list):
# print('>>>>', ti.result(),n)
# time.sleep(1)
#每個兩秒去去一次結果,哪個有結果了,就可以取出哪一個,想表達的意思就是說不用等到所有的結果都出來再去取,可以輪詢著去取結果,因為你的任務需要執行的時間很長,那麼你需要等很久才能拿到結果,通過這樣的方式可以將快速出來的結果先拿出來。如果有的結果物件裡面還沒有執行結果,那麼你什麼也取不到,這一點要注意,不是空的,是什麼也取不到,那怎麼判斷我已經取出了哪一個的結果,可以通過列舉enumerate來搞,記錄你是哪一個位置的結果物件的結果已經被取過了,取過的就不再取了
#結果分析: 列印的結果是沒有順序的,因為到了func函式中的sleep的時候執行緒會切換,誰先列印就沒準兒了,但是最後的我們通過結果物件取結果的時候拿到的是有序的,因為我們主執行緒進行for迴圈的時候,我們是按順序將結果物件新增到列表中的。
# 6696列印的: 0
# 5044列印的: 3
# 4424列印的: 2
# 8840列印的: 1
# 1244列印的: 4
# 主執行緒
# >>> 0
# >>> 1
# >>> 4
# >>> 9
# >>> 16
3. 執行緒池的簡單使用
只需要將這一行程式碼改為下面這一行就可以了,其他的程式碼都不用變
# tpool = ThreadPoolExecutor(max_workers=5)
tpool = ProcessPoolExecutor(max_workers=4)
#預設一般起程序的資料不超過CPU個數
4. map的使用
import time, os, random, threading
from concurrent.futures import ThreadPoolExecutor
def work(n):
print("%s is running"%threading.get_ident())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
t = ThreadPoolExecutor(max_workers=4)
# for i in range(10):
# t.submit(work, i)
# 用map取代submit
s = t.map(work, range(5))
print([i for i in s])
# 5792 is running
# 5824 is running
# 9208 is running
# 1296 is running
# 5792 is running
# [0, 1, 4, 9, 16]
5. 回撥函式
五. 協成
1. 背景
對於單執行緒下,我們不可避免程式中出現io操作,但如果我們能在自己的程式中(即使用者程式級別,而非作業系統級別)控制單執行緒下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該執行緒能夠最大限度地處於就緒態,即隨時都可以被cpu執行的狀態,相當於我們在使用者程式級別將自己的io操作最大限度地隱藏起來,從而可以迷惑作業系統,讓其看到:該執行緒好像是一直在計算,io比較少,從而更多的將cpu的執行許可權分配給我們的執行緒。
協程的本質就是在單執行緒下,由使用者自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。為了實現它,我們需要找尋一種可以同時滿足以下條件的解決方案:
1). 可以控制多個任務之間的切換,切換之前將任務的狀態儲存下來,以便重新執行時,可以基於暫停的位置繼續執行。
2). 作為1的補充:可以檢測io操作,在遇到io操作的情況下才發生切換
2. 協成介紹
協程:是單執行緒下的併發,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒,即協程是由使用者程式自己控制排程的。、
需要強調的是:
1). python的執行緒屬於核心級別的,即由作業系統控制排程(如單執行緒遇到io或執行時間過長就會被迫交出cpu執行許可權,切換其他執行緒執行)
2). 單執行緒內開啟協程,一旦遇到io,就會從應用程式級別(而非作業系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
對比作業系統控制執行緒的切換,使用者在單執行緒內控制協程的切換
優點如下:
1). 協程的切換開銷更小,屬於程式級別的切換,作業系統完全感知不到,因而更加輕量級
2). 單執行緒內就可以實現併發的效果,最大限度地利用cpu
缺點如下:
1). 協程的本質是單執行緒下,無法利用多核,可以是一個程式開啟多個程序,每個程序內開啟多個執行緒,每個執行緒內開啟協程
2). 協程指的是單個執行緒,因而一旦協程出現阻塞,將會阻塞整個執行緒
總結協程特點:
1). 必須在只有一個單執行緒裡實現併發
2). 修改共享資料不需加鎖
3). 使用者程式裡自己儲存多個控制流的上下文棧
4). 附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模組(select機制))
3. greenlet
如果我們在單個執行緒內有20個任務,要想實現在多個任務之間切換,使用yield生成器的方式過於麻煩(需要先得到初始化一次的生成器,然後再呼叫send。。。非常麻煩),而使用greenlet模組可以非常簡單地實現這20個任務直接的切換
#真正的協程模組就是使用greenlet完成的切換
from greenlet import greenlet
def eat(name):
print('%s eat 1' %name) #2
g2.switch('taibai') #3
print('%s eat 2' %name) #6
g2.switch() #7
def play(name):
print('%s play 1' %name) #4
g1.switch() #5
print('%s play 2' %name) #8
g1=greenlet(eat)
g2=greenlet(play)
g1.switch('taibai')#可以在第一次switch時傳入引數,以後都不需要 1
#單純的切換(在沒有io的情況下或者沒有重複開闢記憶體空間的操作),反而會降低程式的執行速度
#順序執行
import time
def f1():
res=1
for i in range(100000000):
res+=i
def f2():
res=1
for i in range(100000000):
res*=i
start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337
#切換
from greenlet import greenlet
import time
def f1():
res=1
for i in range(100000000):
res+=i
g2.switch()
def f2():
res=1
for i in range(100000000):
res*=i
g1.switch()
start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524
greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。
雖然沒有規避固有的I/O時間,但是我們使用這個時間來做別的事情了,一般在工作中我們都是程序+執行緒+協程的方式來實現併發,以達到最好的併發效果,如果是4核的cpu,一般起5個程序,每個程序中20個執行緒(5倍cpu數量),每個執行緒可以起500個協程,大規模爬取頁面的時候,等待網路延遲的時間的時候,我們就可以用協程去實現併發。 併發數量 = 5 * 20 * 500 = 50000個併發,這是一般一個4cpu的機器最大的併發數。nginx在負載均衡的時候最大承載量就是5w個
單執行緒裡的這20個任務的程式碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提高效率,這就用到了Gevent模組。
4. gevent模組
Gevent 是一個第三方庫,可以輕鬆通過gevent實現併發同步或非同步程式設計,在gevent中用到的主要模式是Greenlet, 它是以C擴充套件模組形式接入Python的輕量級協程。 Greenlet全部執行在主程式作業系統程序的內部,但它們被協作式地排程。
1). 用法
g1=gevent.spawn(func,1,2,3,x=4,y=5)建立一個協程物件g1,spawn括號內第一個引數是函式名,如eat,後面可以有多個引數,可以是位置實參或關鍵字實參,都是傳給函式eat的,spawn是非同步提交任務
g2=gevent.spawn(func2)
g1.join() #等待g1結束,上面只是建立協程物件,這個join才是去執行
g2.join() #等待g2結束 有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,但是你會發現,如果g2裡面的任務執行的時間長,但是不寫join的話,就不會執行完等到g2剩下的任務了
#或者上述兩步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值
2). 遇到I/O阻塞會自動切換任務
import gevent
def eat(name):
print("%s eat 1"% name)
gevent.sleep(2)
print("%s eat 2"% name)
def play(name):
print("%s play 1" % name)
gevent.sleep(1)
print("%s play 2" % name)
g1 = gevent.spawn(eat, "egon")
g2 = gevent.spawn(play, "egon")
g1.join()
g2.join()
# gevent.spawn([g1, g2])
print("主")
# egon eat 1
# egon play 1
# egon play 2
# egon eat 2
# 主
上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,
而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行程式碼,打補丁,就可以識別了
from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模組之前
或者我們乾脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到檔案的開頭
我們可以用threading.current_thread().getName()來檢視每個g1和g2,檢視的結果為DummyThread-n,即假執行緒,虛擬執行緒,其實都在一個執行緒裡面
程序執行緒的任務切換是由作業系統自行切換的,你自己不能控制
協程是通過自己的程式(程式碼)來進行切換的,自己能夠控制,只有遇到協程模組能夠識別的IO操作的時候,程式才會進行任務切換,實現併發效果,如果所有程式都沒有IO操作,那麼就基本屬於序列執行了。
5. 協成子同步與非同步
from gevent import spawn, joinall,monkey; monkey.patch_all()
import time
def task(pid):
"""
Some non-deterministic task
"""
time.sleep(0.5)
print('Task %s done' % pid)
# 同步
def synchronous():
for i in range(10):
task(i)
# spawn()非同步提交任務
def asynchronous():
g_l = [spawn(task, i) for i in range(10)]
joinall(g_l)
if __name__ == '__main__':
print("Synchronous:")
synchronous()
print("Asynchronous:")
asynchronous()
# 上面程式的重要部分是將task函式封裝到Greenlet內部執行緒的gevent.spawn。 初始化的greenlet列表存放在陣列threads中,此陣列被傳給gevent.joinall 函式,後者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完後才會繼續向下走。
6. gevent應用之一: 爬蟲
from gevent import monkey; monkey.patch_all()
import gevent
import requests
import time
def get_page(url):
print("GET: %s"%url)
response = requests.get(url)
# print(response.status_code)
if response.status_code == 200:
print("%d bytes receved from %s"%(len(response.text), url))
# print(response.text)
s = time.time()
gevent.joinall([
gevent.spawn(get_page, 'https://www.python.org/'),
gevent.spawn(get_page, 'https://www.yahoo.com/'),
gevent.spawn(get_page, 'https://github.com/'),
])
e = time.time()
print("run time is %s" % (e-s))
# GET: https://www.python.org/
# GET: https://www.yahoo.com/
# GET: https://github.com/
# 48862 bytes receved from https://www.python.org/
# 79878 bytes receved from https://github.com/
# 518555 bytes receved from https://www.yahoo.com/
# run time is 10.245655298233032
# 將上面的程式最後加上一段序列的程式碼看看效率:如果你的程式不需要太高的效率,那就不用什麼併發啊協程啊之類的東西。
print('--------------------------------')
s = time.time()
requests.get('https://www.python.org/')
requests.get('https://www.yahoo.com/')
requests.get('https://github.com/')
t = time.time()
print('序列時間>>',t-s)
# --------------------------------
# 序列時間>> 13.477648973464966
7. gevent應用之二: 實現單執行緒下的socket併發
通過gevent實現單執行緒下的socket併發(from gevent import monkey;monkey.patch_all()一定要放到匯入socket模組之前,否則gevent無法識別socket的阻塞)
一個網路請求裡面經過多個時間延遲time
1). 服務端:
from gevent import monkey; monkey.patch_all()
from socket import *
import gevent
#如果不想用money.patch_all()打補丁,可以用gevent自帶的socket
# from gevent import socket
# s=socket.socket()
def server(server_ip, port):
s = socket(AF_INET, SOCK_STREAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind((server_ip, port))
s.listen(5)
while 1:
conn, addr = s.accept()
gevent.spawn(talk, conn, addr)
def talk(conn, addr):
try:
while 1:
res = conn.recv(1024).decode("utf-8")
print("client %s:%s msg: %s"%(addr[0], addr[1], res))
# msg = input(">>>: ").strip()
msg = res.upper()
conn.send(msg.encode("utf-8"))
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server("127.0.0.1", 8080)
2). 客戶端
from socket import *
c = socket(AF_INET, SOCK_STREAM)
c.connect(("127.0.0.1", 8080))
while 1:
msg = input(">>>: ").strip()
if not msg:
continue
c.send(msg.encode("utf-8"))
msg2 = c.recv(1024)
print("客戶端: ", msg2.decode("utf-8"))
3). 多執行緒併發多個客戶端,去請求上面的服務端是沒問題的
from threading import Thread
from socket import *
import threading
def client(server_ip, port):
c = socket(AF_INET, SOCK_STREAM)
c.connect((server_ip, port))
count = 0
while 1:
c.send(("%s say hello %s" % (threading.current_thread().getName(), count)).encode("utf-8"))
msg = c.recv(1024)
print("server: ", msg.decode("utf-8"))
count += 1
if __name__ == '__main__':
for i in range(500):
t = Thread(target=client, args=("127.0.0.1", 8080))
t.start()