Python之旅12:執行緒、程序和協程
本章內容:
- 執行緒(執行緒鎖、threading.Event、queue 佇列、生產者消費者模型、自定義執行緒池)
- 程序(資料共享、程序池)
- 協程
概念:
1、程序:本質上就是一段程式的執行過程(抽象概念)
2、執行緒:最小的執行單元
3、程序:最小的資源單位
4、程序在執行過程中擁有獨立的記憶體單元,而多個執行緒共享記憶體。
5、程序是系統進行資源分配和排程的一個獨立單位,執行緒是程序的一個實體,是CPU排程和分派的基本單位,執行緒自己基本上不擁有系統資源,只擁有一點在執行中必不可少的資源(如程式計數器,一組暫存器和和棧)但是它可與同屬一個程序的其他執行緒共享所擁有的全部資源。
一、執行緒
Threading用於提供執行緒相關的操作,執行緒是應用程式中工作的最小單元。
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time def show(arg): time.sleep(1) print ('thread'+str(arg)) for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print ('main thread stop')
上述程式碼建立了10個“前臺”執行緒,然後控制器就交給了CPU,CPU根據指定演算法進行排程,分片執行指令。
thread方法:
- t.start() : 啟用執行緒
- t.getName() : 獲取執行緒的名稱
- t.setName() : 設定執行緒的名稱
- t.name : 獲取或設定執行緒的名稱
- t.is_alive() : 判斷執行緒是否為啟用狀態
- t.isAlive() :判斷執行緒是否為啟用狀態
- t.setDaemon() 設定為後臺執行緒或前臺執行緒(預設:False);通過一個布林值設定執行緒是否為守護執行緒,必須在執行start()方法之前才可以使用。如果是後臺執行緒,主執行緒執行過程中,後臺執行緒也在進行,主執行緒執行完畢後,後臺執行緒不論成功與否,均停止;如果是前臺執行緒,主執行緒執行過程中,前臺執行緒也在進行,主執行緒執行完畢後,等待前臺執行緒也執行完成後,程式停止
- t.isDaemon() : 判斷是否為守護執行緒
- t.ident :獲取執行緒的識別符號。執行緒識別符號是一個非零整數,只有在呼叫了start()方法之後該屬性才有效,否則它只返回None
- t.join() :逐個執行每個執行緒,執行完畢後繼續往下執行,該方法使得多執行緒變得無意義
- t.run() :執行緒被cpu排程後自動執行執行緒物件的run方法
import threading
import time
class MyThread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)
self.num = num
def run(self):#定義每個執行緒要執行的函式
print("running on number:%s" %self.num)
time.sleep(3)
if __name__ == '__main__':
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()
setDaemon(True) :
將執行緒設定為守護執行緒,必須在start()方法高用之前設定,如果不設定為守護執行緒程式會被無限掛起。這個方法基本和join是相反的。 當我們在程式執行中,執行一個主執行緒,如果主執行緒以建立一個子執行緒,主執行緒和子執行緒就兵分兩路,分別執行,那麼當主執行緒完成 想退出時,會檢驗子執行緒是否完成。如果執行緒未完成,則主執行緒會等待了執行緒完成後再退出。但是有時候我們需要的是隻要主執行緒完 成,不管子執行緒是否完成,都要和主執行緒一起退出,這時就可以用setDaemon方法了
threading.activeCount():返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果
import threading
from time import ctime,sleep
import time
def ListenMusic(name):
print ("Begin listening to %s. %s" %(name,ctime()))
sleep(3)
print("end listening %s"%ctime())
def RecordBlog(title):
print ("Begin recording the %s! %s" %(title,ctime()))
sleep(5)
print('end recording %s'%ctime())
threads = []
t1 = threading.Thread(target=ListenMusic,args=('水手',))
t2 = threading.Thread(target=RecordBlog,args=('python執行緒',))
threads.append(t1)
threads.append(t2)
if __name__ == '__main__':
#t1.setDaemon(True)
t2.setDaemon(True)
for t in threads:
#t.setDaemon(True) #注意:一定在start之前設定
t.start()
print(t.getName())
print("count:",threading.active_count())
while threading.active_count()==1:
print ("all over %s" %ctime())
執行緒鎖(threading.RLock & threading.Lock)
我們使用執行緒對資料進行操作的時候,如果多個執行緒同時修改某個資料,可能會出現不可預料的結果,為了保證資料的準確性,引入了鎖的概念。
import threading
import time
num = 0
lock = threading.RLock() # 例項化鎖類
def work():
lock.acquire() # 加鎖
global num
num += 1
time.sleep(1)
print(num)
lock.release() # 解鎖
for i in range(10):
t = threading.Thread(target=work)
t.start()
threading.RLock和threading.Lock 的區別
RLock允許在同一執行緒中被多次acquire。而Lock卻不允許這種情況。 如果使用RLock,那麼acquire和release必須成對出現,即呼叫了n次acquire,必須呼叫n次的release才能真正釋放所佔用的鎖。
import threading
lock = threading.Lock()
lock.acquire()
lock.acquire() # 產生死鎖
lock.release()
lock.release()
import threading
rlock = threading.RLock()
rlock.acquire()
rlock.acquire() # 在同一執行緒內,程式不會堵塞。
rlock.release()
rlock.release()
print("end.")
訊號量(Semaphore)
互斥鎖 同時只允許一個執行緒更改資料,而Semaphore是同時允許一定數量的執行緒更改資料 ,比如廁所有3個坑,那最多隻允許3個人上廁所,後面的人只能等裡面有人出來了才能再進去。
import threading,time
def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %s" %n)
semaphore.release()
if __name__ == '__main__':
num= 0
semaphore = threading.BoundedSemaphore(5) #最多允許5個執行緒同時執行
for i in range(20):
t = threading.Thread(target=run,args=(i,))
t.start()
事件(event)
python執行緒的事件用於主執行緒控制其他執行緒的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全域性定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。
- clear:將“Flag”設定為False
- set:將“Flag”設定為True
- Event.wait([timeout]) : 堵塞執行緒,直到Event物件內部標識位被設為True或超時(如果提供了引數timeout)
- Event.isSet() :判斷標識位是否為Ture
import threading
def do(event):
print('start')
event.wait()
print('execute')
event_obj = threading.Event()
for i in range(10):
t = threading.Thread(target=do, args=(event_obj,))
t.start()
event_obj.clear()
inp = input('input:')
if inp == 'true':
event_obj.set()
當執行緒執行的時候,如果flag為False,則執行緒會阻塞,當flag為True的時候,執行緒不會阻塞。它提供了本地和遠端的併發性。
Condition
Python提供的Condition物件提供了對複雜執行緒同步問題的支援。Condition被稱為條件變數,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。執行緒首先acquire一個條件變數,然後判斷一些條件。如果條件不滿足則wait;如果條件滿足,進行一些處理改變條件後,通過notify方法通知其他執行緒,其他處於wait狀態的執行緒接到通知後會重新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。
在典型的設計風格里,利用condition變數用鎖去通許訪問一些共享狀態,執行緒在獲取到它想得到的狀態前,會反覆呼叫wait()。修改狀態的執行緒在他們狀態改變時呼叫 notify() or notify_all(),用這種方式,執行緒會盡可能的獲取到想要的一個等待者狀態。
import threading
import time
def consumer(cond):
with cond:
print("consumer before wait")
cond.wait()
print("consumer after wait")
def producer(cond):
with cond:
print("producer before notifyAll")
cond.notifyAll()
print("producer after notifyAll")
condition = threading.Condition()
c1 = threading.Thread(name="c1", target=consumer, args=(condition,))
c2 = threading.Thread(name="c2", target=consumer, args=(condition,))
p = threading.Thread(name="p", target=producer, args=(condition,))
c1.start()
time.sleep(2)
c2.start()
time.sleep(2)
p.start()
# consumer()執行緒要等待producer()設定了Condition之後才能繼續。
Condition使得執行緒等待,只有滿足某條件時,才釋放n個執行緒
import threading
def run(n):
con.acquire()
con.wait()
print("run the thread:%s " % n)
con.release()
if __name__ == "__main__":
con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
while True:
inp = input(">>>")
if inp == 'q':
break
con.acquire()
con.notify(int(inp))
con.release()
import threading
def condition_func():
ret = False
inp = input('>>>')
if inp == '1':
ret = True
return ret
def run(n):
con.acquire()
con.wait_for(condition_func)
print("run the thread: %s" %n)
con.release()
if __name__ == '__main__':
con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
queue 佇列
適用於多執行緒程式設計的先進先出資料結構,可以用來安全的傳遞多執行緒資訊。
queue 方法:
- q = queue.Queue(maxsize=0) # 構造一個先進顯出佇列,maxsize指定佇列長度,為0 時,表示佇列長度無限制。
- q.join() # 等到佇列為kong的時候,在執行別的操作
- q.qsize() # 返回佇列的大小 (不可靠)
- q.empty() # 當佇列為空的時候,返回True 否則返回False (不可靠)
- q.full() # 當佇列滿的時候,返回True,否則返回False (不可靠)
- q.put(item, block=True, timeout=None) # 將item放入Queue尾部,item必須存在,可以引數block預設為True,表示當佇列滿時,會等待佇列給出可用位置,為False時為非阻塞,此時如果佇列已滿,會引發queue.Full 異常。 可選引數timeout,表示 會阻塞設定的時間,過後,如果佇列無法給出放入item的位置,則引發 queue.Full 異常
- q.get(block=True, timeout=None) # 移除並返回佇列頭部的一個值,可選引數block預設為True,表示獲取值的時候,如果佇列為空,則阻塞,為False時,不阻塞,若此時佇列為空,則引發 queue.Empty異常。 可選引數timeout,表示會阻塞設定的時候,過後,如果佇列為空,則引發Empty異常。
- q.put_nowait(item) # 等效於 put(item,block=False)
- q.get_nowait() # 等效於 get(item,block=False)
生產者消費者模型
import queue
import threading
que = queue.Queue(10)
def put(i):
que.put(i)
# print("size:", que.qsize())
def get(i):
get = que.get(i)
print("get:", get)
for i in range(1, 13):
t = threading.Thread(target=put, args=(i,))
t.start()
for i in range(1, 11):
t = threading.Thread(target=get, args=(i,))
t.start()
print("size:", que.qsize())
import queue
import threading
import time
import random
message = queue.Queue(10)
def product(num):
for i in range(num):
message.put(i)
print('將{}新增到佇列中'.format(i))
time.sleep(random.randrange(0, 1))
def consume(num):
count = 0
while count<num:
i = message.get()
print('將{}從佇列取出'.format(i))
time.sleep(random.randrange(1, 2))
count += 1
t1 = threading.Thread(target=product, args=(10, ))
t1.start()
t2 = threading.Thread(target=consume, args=(10, ))
t2.start()
自定義執行緒池:
# 自定義執行緒池(一)
import queue
import threading
import time
class TreadPool:
def __init__(self, max_num=20):
self.queue = queue.Queue(max_num)
for i in range(max_num):
self.queue.put(threading.Thread)
def get_thread(self):
return self.queue.get()
def add_thread(self):
self.queue.put(threading.Thread)
def func(pool, n):
time.sleep(1)
print(n)
pool.add_thread()
p = TreadPool(10)
for i in range(1, 100):
thread = p.get_thread()
t = thread(target=func, args=(p, i,))
t.start()
# 執行緒池(二)
import queue
import threading
import contextlib
import time
StopEvent = object()
class Threadpool:
def __init__(self, max_num=10):
self.q = queue.Queue()
self.max_num = max_num
self.terminal = False
self.generate_list = [] # 以建立執行緒列表
self.free_list = [] # 以建立的執行緒空閒列表
def run(self, func, args, callback=None):
"""
執行緒池執行一個任務
:param func: 任務函式
:param args: 任務函式所需引數
:param callback: 任務執行失敗或成功後執行的回撥函式,回撥函式有兩個引數1、任務函式執行狀態;2、任務函式返回值(預設為None,即:不執行回撥函式)
:return: 如果執行緒池已經終止,則返回True否則None
"""
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args, callback,)
self.q.put(w)
def generate_thread(self):
"""
建立一個執行緒
"""
t = threading.Thread(target=self.call)
t.start()
def call(self):
"""
迴圈去獲取任務函式並執行任務函式
"""
current_thread = threading.currentThread # 當前執行緒
self.generate_list.append(current_thread)
event = self.q.get()
while event != StopEvent:
func, arguments, callback = event
try:
result = func(*arguments)
status = True
except Exception as e:
status = False
result = e
if callback is not None:
try:
callback(status, result)
except Exception as e:
pass
if self.terminal:
event = StopEvent
else:
with self.worker_state(self.free_list, current_thread):
event = self.q.get()
# self.free_list.append(current_thread)
# event = self.q.get()
# self.free_list.remove(current_thread)
else:
self.generate_list.remove(current_thread)
def close(self):
"""
執行完所有的任務後,所有執行緒停止
"""
num = len(self.generate_list)
while num:
self.q.put(StopEvent)
num -= 1
def terminate(self):
"""
無論是否還有任務,終止執行緒
"""
self.terminal = True
while self.generate_list:
self.q.put(StopEvent)
self.q.empty() # 清空佇列
@contextlib.contextmanager # with上下文管理
def worker_state(self, frelist, val):
"""
用於記錄執行緒中正在等待的執行緒數
"""
frelist.append(val)
try:
yield
finally:
frelist.remove(val)
def work(i):
time.sleep(1)
print(i)
pool = Threadpool()
for item in range(50):
pool.run(func=work, args=(item,))
pool.close()
# pool.terminate()
在寫程式碼之前,我們先來看一下該怎麼設計這樣一個執行緒池,上面的執行緒池,我們的佇列中,存的是執行緒類,我們每處理一個任務都例項化一個執行緒,然後執行完了之後,該執行緒就被丟棄了,這樣有點不合適。我們這次設計的時候,
- 佇列中存的不是執行緒類,而是任務,我們從佇列中拿取的都是任務
- 每次執行任務的時候,不是都要生成一個執行緒,而是如果以前生成的執行緒有空閒的話,就用以前的執行緒
- 支援回掉機制,支援close,terminate
下面來一下程式碼是怎麼實現的
import threading
import queue
import time
import contextlib
class ThreadingPool:
def __init__(self, num):
self.max = num
self.terminal = False
self.q = queue.Queue()
self.generate_list = [] # 儲存已經生成的執行緒
self.free_list = [] # 儲存那些已經完成任務的執行緒
def run(self, func, args=None, callbk=None):
self.q.put((func, args, callbk)) # 將任務資訊作為一個元祖放到佇列中去
if len(self.free_list) == 0 and len(self.generate_list) < self.max:
self.threadstart()
def threadstart(self):
t = threading.Thread(target=self.handel)
t.start()
def handel(self):
current_thread = threading.current_thread()
self.generate_list.append(current_thread)
event = self.q.get()
while event != 'stop':
func, args, callbk = event
flag = True
try:
ret = func(*args)
except Exception as e:
flag = False
ret = e
if callbk is not None:
try:
callbk(ret)
except Exception as e:
pass
if not self.terminal:
with self.auto_append_remove(current_thread):
event = self.q.get()
else:
event = 'stop'
else:
self.generate_list.remove(current_thread)
def terminate(self):
self.terminal = True
while self.generate_list:
self.q.put('stop')
self.q.empty()
def close(self):
num = len(self.generate_list)
while num:
self.q.put('stop')
num -= 1
@contextlib.contextmanager
def auto_append_remove(self, thread):
self.free_list.append(thread)
try:
yield
finally:
self.free_list.remove(thread)
def f(i):
# time.sleep(1)
return i
def f1(i):
print(i)
p = ThreadingPool(5)
for i in range(20):
p.run(func=f, args=(i,), callbk=f1)
p.close()
二、程序
執行緒的上一級就是程序,程序可包含很多執行緒,程序和執行緒的區別是程序間的資料不共享,多程序也可以用來處理多工,不過多程序很消耗資源,計算型的任務最好交給多程序來處理,IO密集型最好交給多執行緒來處理,此外程序的數量應該和cpu的核心說保持一致。
在windows中不能用fork來建立多程序,因此只能匯入multiprocessing,來模擬多程序,下面首先來看一下怎麼建立程序,大家可以先猜一下下面的結果是什麼
# 程序
import multiprocessing
l = []
def f(i):
l.append(i)
print('hi', l)
if __name__ == '__main__':
for i in range(10):
p = multiprocessing.Process(target=f, args=(i,)) # 資料不共享,建立10份 l列表
p.start()
注意:由於程序之間的資料需要各自持有一份,所以建立程序需要的非常大的開銷。
資料共享
不同程序間記憶體是不共享的,要想實現兩個程序間的資料交換,可以用以下方法:
Shared memory
資料可以用Value或Array儲存在一個共享記憶體地圖裡,如下:
from multiprocessing import Process, Value, Array
def f(a, b):
a.value = 3.111
for i in range(len(b)):
b[i] += 100
if __name__ == '__main__':
num = Value('f', 3.333) # 類似C語言中的 浮點型數
l = Array('i', range(10)) # 類似C語言中的整形陣列,長度為10
print(num.value)
print(l[:])
p = Process(target=f, args=(num, l))
p.start()
p.join()
print(num.value)# 大家自己執行一下,看下兩次列印結果是否一樣
print(l[:])
'''
結果:
3.3329999446868896
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
3.1110000610351562
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
'''
建立num和l 時,“d”和“i”引數由Array模組使用的typecodes建立:“d”表示一個雙精度的浮點數,“i”表示一個有符號的整數,這些共享物件將被執行緒安全的處理。
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
Server process
由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array型別的支援。
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
# 輸出結果:
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Server process manager比 shared memory 更靈活,因為它可以支援任意的物件型別。另外,一個單獨的manager可以通過程序在網路上不同的計算機之間共享,不過他比shared memory要慢。
# manage.dict()共享資料
from multiprocessing import Process,Manager
manage = Manager()
dic = manage.dict()
def Foo(i):
dic[i] = 100+i
print (dic.values())
for i in range(2):
p = Process(target=Foo,args=(i,))
p.start()
p.join()
當建立程序時(非使用時),共享資料會被拿到子程序中,當程序中執行完畢後,再賦值給原值。
程序鎖例項
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process, Array, RLock
def Foo(lock,temp,i):
"""
將第0個數加100
"""
lock.acquire()
temp[0] = 100+i
for item in temp:
print ('%s----->%s'%(i,item))
lock.release()
lock = RLock()
temp = Array('i', [11, 22, 33, 44])
for i in range(20):
p = Process(target=Foo,args=(lock,temp,i,))
p.start()
程序池
程序池內部維護一個程序序列,當使用時,則去程序池中獲取一個程序,如果程序池序列中沒有可供使用的進程序,那麼程式就會等待,直到程序池中有可用程序為止。
方法:
-
apply(func[, args[, kwds]]) :使用arg和kwds引數呼叫func函式,結果返回前會一直阻塞,由於這個原因,apply_async()更適合併發執行,另外,func函式僅被pool中的一個程序執行。
-
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個變體,會返回一個結果物件。如果callback被指定,那麼callback可以接收一個引數然後被呼叫,當結果準備好回撥時會呼叫callback,呼叫失敗時,則用error_callback替換callback。 Callbacks應被立即完成,否則處理結果的執行緒會被阻塞。
-
close() : 阻止更多的任務提交到pool,待任務完成後,工作程序會退出。
-
terminate() : 不管任務是否完成,立即停止工作程序。在對pool物件程序垃圾回收的時候,會立即呼叫terminate()。
-
join() : wait工作執行緒的退出,在呼叫join()前,必須呼叫close() or terminate()。這樣是因為被終止的程序需要被父程序呼叫wait(join等價與wait),否則程序會成為殭屍程序
程序池中有兩個方法:
- apply
- apply_async
from multiprocessing import Pool
import time
def myFun(i):
time.sleep(2)
print("mytfun",i)
return i+100
def end_call(arg):
print("end_call",arg)
if __name__ == "__main__":
p = Pool(5)
# print(p.map(myFun,range(10)))
for i in range(10):
p.apply_async(func=myFun,args=(i,),callback=end_call)
print("end")
p.close()
p.join()
官方例程:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# 建立4個程序
with Pool(processes=4) as pool:
# 列印 "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# 使用任意順序輸出相同的數字,
for i in pool.imap_unordered(f, range(10)):
print(i)
# 非同步執行"f(20)"
res = pool.apply_async(f, (20,)) # 只執行一個程序
print(res.get(timeout=1)) # 輸出 "400"
# 非同步執行 "os.getpid()"
res = pool.apply_async(os.getpid, ()) # 只執行一個程序
print(res.get(timeout=1)) # 輸出程序的 PID
# 執行多個非同步執行可能會使用多個程序
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# 是一個程序睡10秒
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("發現一個 multiprocessing.TimeoutError異常")
print("目前,池中還有其他的工作")
# 退出with塊中已經停止的池
print("Now the pool is closed and no longer available")
三、協程
執行緒和程序的操作是由程式觸發系統介面,最後的執行者是系統;協程的操作則是程式設計師。
協程存在的意義:對於多執行緒應用,CPU通過切片的方式來切換執行緒間的執行,執行緒切換時需要耗時(儲存狀態,下次繼續)。協程,則只使用一個執行緒,在一個執行緒中規定某個程式碼塊執行順序。
協程的適用場景:當程式中存在大量不需要CPU的操作時(IO),適用於協程;
# 安裝
pip install gevent
# 匯入模組
import gevent
greenlet
# greenlet
from greenlet import greenlet
def test1():
print(11)
gr2.switch()
print(22)
gr2.switch()
def test2():
print(33)
gr1.switch()
print(44)
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
'''
# 輸出結果:
11
33
22
44
'''
gevent
# gevent
import gevent
def foo():
print("Running in foo")
gevent.sleep(0)
print("Explicit context switch to foo angin")
def bar():
print("Explicit context to bar")
gevent.sleep(0)
print("Implicit context swich back to bar")
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
'''
# 輸出結果:
Running in foo
Explicit context to bar
Explicit context switch to foo angin
Implicit context swich back to bar
'''
遇到IO自動切換
from gevent import monkey
monkey.patch_all()
import gevent
import requests
def f(url):
print("FET: %s" % url)
resp = requests.get(url)
data = len(resp.text)
print(url, data)
gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])
上面的例子,利用協程,一個執行緒完成所有的請求,發出請求的時候,不會等待回覆,而是一次性將所有的請求都發出求,收到一個回覆就處理一個回覆,這樣一個執行緒就解決了所有的事情,效率極高。