1. 程式人生 > >Python之旅12:執行緒、程序和協程

Python之旅12:執行緒、程序和協程

本章內容:

  • 執行緒(執行緒鎖、threading.Event、queue 佇列、生產者消費者模型、自定義執行緒池)
  • 程序(資料共享、程序池)
  • 協程

概念:

1、程序:本質上就是一段程式的執行過程(抽象概念)

2、執行緒:最小的執行單元

3、程序:最小的資源單位

4、程序在執行過程中擁有獨立的記憶體單元,而多個執行緒共享記憶體。

5、程序是系統進行資源分配和排程的一個獨立單位,執行緒是程序的一個實體,是CPU排程和分派的基本單位,執行緒自己基本上不擁有系統資源,只擁有一點在執行中必不可少的資源(如程式計數器,一組暫存器和和棧)但是它可與同屬一個程序的其他執行緒共享所擁有的全部資源。

一、執行緒

Threading用於提供執行緒相關的操作,執行緒是應用程式中工作的最小單元。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
  
def show(arg):
    time.sleep(1)
    print ('thread'+str(arg))
  
for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    t.start()
  
print ('main thread stop')

上述程式碼建立了10個“前臺”執行緒,然後控制器就交給了CPU,CPU根據指定演算法進行排程,分片執行指令。

thread方法:

  • t.start() : 啟用執行緒
  • t.getName() : 獲取執行緒的名稱
  • t.setName() : 設定執行緒的名稱 
  • t.name : 獲取或設定執行緒的名稱
  • t.is_alive() : 判斷執行緒是否為啟用狀態
  • t.isAlive() :判斷執行緒是否為啟用狀態
  • t.setDaemon() 設定為後臺執行緒或前臺執行緒(預設:False);通過一個布林值設定執行緒是否為守護執行緒,必須在執行start()方法之前才可以使用。如果是後臺執行緒,主執行緒執行過程中,後臺執行緒也在進行,主執行緒執行完畢後,後臺執行緒不論成功與否,均停止;如果是前臺執行緒,主執行緒執行過程中,前臺執行緒也在進行,主執行緒執行完畢後,等待前臺執行緒也執行完成後,程式停止
  • t.isDaemon() : 判斷是否為守護執行緒
  • t.ident :獲取執行緒的識別符號。執行緒識別符號是一個非零整數,只有在呼叫了start()方法之後該屬性才有效,否則它只返回None
  • t.join() :逐個執行每個執行緒,執行完畢後繼續往下執行,該方法使得多執行緒變得無意義
  • t.run() :執行緒被cpu排程後自動執行執行緒物件的run方法
import threading
import time
 
 
class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num
 
    def run(self):#定義每個執行緒要執行的函式
 
        print("running on number:%s" %self.num)
 
        time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

setDaemon(True) :

      將執行緒設定為守護執行緒,必須在start()方法高用之前設定,如果不設定為守護執行緒程式會被無限掛起。這個方法基本和join是相反的。        當我們在程式執行中,執行一個主執行緒,如果主執行緒以建立一個子執行緒,主執行緒和子執行緒就兵分兩路,分別執行,那麼當主執行緒完成          想退出時,會檢驗子執行緒是否完成。如果執行緒未完成,則主執行緒會等待了執行緒完成後再退出。但是有時候我們需要的是隻要主執行緒完          成,不管子執行緒是否完成,都要和主執行緒一起退出,這時就可以用setDaemon方法了

 threading.activeCount()返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果

import threading
from time import ctime,sleep
import time

def ListenMusic(name):

        print ("Begin listening to %s. %s" %(name,ctime()))
        sleep(3)
        print("end listening %s"%ctime())


def RecordBlog(title):

        print ("Begin recording the %s! %s" %(title,ctime()))
        sleep(5)
        print('end recording %s'%ctime())

threads = []

t1 = threading.Thread(target=ListenMusic,args=('水手',))
t2 = threading.Thread(target=RecordBlog,args=('python執行緒',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':
    #t1.setDaemon(True)
    t2.setDaemon(True)

    for t in threads:
        #t.setDaemon(True) #注意:一定在start之前設定
        t.start()
        print(t.getName())
        print("count:",threading.active_count())


    while threading.active_count()==1:

        print ("all over %s" %ctime())

 

執行緒鎖(threading.RLock & threading.Lock)

我們使用執行緒對資料進行操作的時候,如果多個執行緒同時修改某個資料,可能會出現不可預料的結果,為了保證資料的準確性,引入了鎖的概念。

import threading
import time
 
num = 0
 
lock = threading.RLock()    # 例項化鎖類
 
def work():
    lock.acquire()  # 加鎖
    global num
    num += 1
    time.sleep(1)
    print(num)
    lock.release()  # 解鎖
 
for i in range(10):
    t = threading.Thread(target=work)
    t.start()

threading.RLock和threading.Lock 的區別

RLock允許在同一執行緒中被多次acquire。而Lock卻不允許這種情況。 如果使用RLock,那麼acquire和release必須成對出現,即呼叫了n次acquire,必須呼叫n次的release才能真正釋放所佔用的鎖。

import threading
 
lock = threading.Lock()
lock.acquire()
lock.acquire()  # 產生死鎖
lock.release()
lock.release()
import threading
 
rlock = threading.RLock()
rlock.acquire()
rlock.acquire()      # 在同一執行緒內,程式不會堵塞。
rlock.release()
rlock.release()
print("end.")

訊號量(Semaphore)

互斥鎖 同時只允許一個執行緒更改資料,而Semaphore是同時允許一定數量的執行緒更改資料 ,比如廁所有3個坑,那最多隻允許3個人上廁所,後面的人只能等裡面有人出來了才能再進去。

import threading,time
 
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s" %n)
    semaphore.release()
 
if __name__ == '__main__':
 
    num= 0
    semaphore  = threading.BoundedSemaphore(5) #最多允許5個執行緒同時執行
    for i in range(20):
        t = threading.Thread(target=run,args=(i,))
        t.start()

事件(event)

python執行緒的事件用於主執行緒控制其他執行緒的執行,事件主要提供了三個方法 set、wait、clear。

事件處理的機制:全域性定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。

  • clear:將“Flag”設定為False
  • set:將“Flag”設定為True
  • Event.wait([timeout]) : 堵塞執行緒,直到Event物件內部標識位被設為True或超時(如果提供了引數timeout)
  • Event.isSet() :判斷標識位是否為Ture
import threading
  
def do(event):
    print('start')
    event.wait()
    print('execute')
  
event_obj = threading.Event()
for i in range(10):
    t = threading.Thread(target=do, args=(event_obj,))
    t.start()
  
event_obj.clear()
inp = input('input:')
if inp == 'true':
    event_obj.set()

當執行緒執行的時候,如果flag為False,則執行緒會阻塞,當flag為True的時候,執行緒不會阻塞。它提供了本地和遠端的併發性。

 

Condition

Python提供的Condition物件提供了對複雜執行緒同步問題的支援。Condition被稱為條件變數,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。執行緒首先acquire一個條件變數,然後判斷一些條件。如果條件不滿足則wait;如果條件滿足,進行一些處理改變條件後,通過notify方法通知其他執行緒,其他處於wait狀態的執行緒接到通知後會重新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。

在典型的設計風格里,利用condition變數用鎖去通許訪問一些共享狀態,執行緒在獲取到它想得到的狀態前,會反覆呼叫wait()。修改狀態的執行緒在他們狀態改變時呼叫 notify() or notify_all(),用這種方式,執行緒會盡可能的獲取到想要的一個等待者狀態。

import threading
import time
def consumer(cond):
    with cond:
        print("consumer before wait")
        cond.wait()
        print("consumer after wait")

def producer(cond):
    with cond:
        print("producer before notifyAll")
        cond.notifyAll()
        print("producer after notifyAll")

condition = threading.Condition()
c1 = threading.Thread(name="c1", target=consumer, args=(condition,))
c2 = threading.Thread(name="c2", target=consumer, args=(condition,))

p = threading.Thread(name="p", target=producer, args=(condition,))

c1.start()
time.sleep(2)
c2.start()
time.sleep(2)
p.start()
# consumer()執行緒要等待producer()設定了Condition之後才能繼續。

Condition使得執行緒等待,只有滿足某條件時,才釋放n個執行緒

import threading

def run(n):
    con.acquire()
    con.wait()
    print("run the thread:%s " % n)
    con.release()

if __name__ == "__main__":

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

    while True:
        inp = input(">>>")
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()
import threading

def condition_func():

    ret = False
    inp = input('>>>')
    if inp == '1':
        ret = True

    return ret


def run(n):
    con.acquire()
    con.wait_for(condition_func)
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

queue 佇列

適用於多執行緒程式設計的先進先出資料結構,可以用來安全的傳遞多執行緒資訊。

queue 方法:

  • q = queue.Queue(maxsize=0) # 構造一個先進顯出佇列,maxsize指定佇列長度,為0 時,表示佇列長度無限制。
  • q.join()   # 等到佇列為kong的時候,在執行別的操作
  • q.qsize()   # 返回佇列的大小 (不可靠)
  • q.empty()    # 當佇列為空的時候,返回True 否則返回False (不可靠)
  • q.full()     # 當佇列滿的時候,返回True,否則返回False (不可靠)
  • q.put(item, block=True, timeout=None) # 將item放入Queue尾部,item必須存在,可以引數block預設為True,表示當佇列滿時,會等待佇列給出可用位置,為False時為非阻塞,此時如果佇列已滿,會引發queue.Full 異常。 可選引數timeout,表示 會阻塞設定的時間,過後,如果佇列無法給出放入item的位置,則引發 queue.Full 異常
  • q.get(block=True, timeout=None) # 移除並返回佇列頭部的一個值,可選引數block預設為True,表示獲取值的時候,如果佇列為空,則阻塞,為False時,不阻塞,若此時佇列為空,則引發 queue.Empty異常。 可選引數timeout,表示會阻塞設定的時候,過後,如果佇列為空,則引發Empty異常。
  • q.put_nowait(item) # 等效於 put(item,block=False)
  • q.get_nowait()     # 等效於 get(item,block=False)

生產者消費者模型

import queue
import threading

que = queue.Queue(10)


def put(i):
    que.put(i)
    # print("size:", que.qsize())


def get(i):
    get = que.get(i)
    print("get:", get)


for i in range(1, 13):
    t = threading.Thread(target=put, args=(i,))
    t.start()

for i in range(1, 11):
    t = threading.Thread(target=get, args=(i,))
    t.start()

print("size:", que.qsize())
import queue
import threading
import time
import random

message = queue.Queue(10)

def product(num):
    for i in range(num):
        message.put(i)
        print('將{}新增到佇列中'.format(i))
        time.sleep(random.randrange(0, 1))


def consume(num):
    count = 0
    while count<num:
        i = message.get()
        print('將{}從佇列取出'.format(i))
        time.sleep(random.randrange(1, 2))
        count += 1


t1 = threading.Thread(target=product, args=(10, ))
t1.start()

t2 = threading.Thread(target=consume, args=(10, ))
t2.start()

自定義執行緒池:

# 自定義執行緒池(一)
import queue
import threading
import time

class TreadPool:

    def __init__(self, max_num=20):
        self.queue = queue.Queue(max_num)
        for i in range(max_num):
            self.queue.put(threading.Thread)

    def get_thread(self):
        return self.queue.get()

    def add_thread(self):
        self.queue.put(threading.Thread)

def func(pool, n):
    time.sleep(1)
    print(n)
    pool.add_thread()

p = TreadPool(10)
for i in range(1, 100):
    thread = p.get_thread()
    t = thread(target=func, args=(p, i,))
    t.start()

 


# 執行緒池(二)
import queue
import threading
import contextlib
import time

StopEvent = object()

class Threadpool:

    def __init__(self, max_num=10):
        self.q = queue.Queue()
        self.max_num = max_num

        self.terminal = False
        self.generate_list = []     # 以建立執行緒列表
        self.free_list = []         # 以建立的執行緒空閒列表

    def run(self, func, args, callback=None):
        """
        執行緒池執行一個任務
        :param func: 任務函式
        :param args: 任務函式所需引數
        :param callback: 任務執行失敗或成功後執行的回撥函式,回撥函式有兩個引數1、任務函式執行狀態;2、任務函式返回值(預設為None,即:不執行回撥函式)
        :return: 如果執行緒池已經終止,則返回True否則None
        """
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        建立一個執行緒
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        迴圈去獲取任務函式並執行任務函式
        """
        current_thread = threading.currentThread    # 當前執行緒
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                status = True
            except Exception as e:
                status = False
                result = e

            if callback is not None:
                try:
                    callback(status, result)
                except Exception as e:
                    pass

            if self.terminal:
                event = StopEvent
            else:
                with self.worker_state(self.free_list, current_thread):
                    event = self.q.get()
                # self.free_list.append(current_thread)
                # event = self.q.get()
                # self.free_list.remove(current_thread)

        else:
            self.generate_list.remove(current_thread)

    def close(self):
        """
        執行完所有的任務後,所有執行緒停止
        """
        num = len(self.generate_list)
        while num:
            self.q.put(StopEvent)
            num -= 1

    def terminate(self):
        """
        無論是否還有任務,終止執行緒
        """
        self.terminal = True
        while self.generate_list:
            self.q.put(StopEvent)
        self.q.empty()  # 清空佇列

    @contextlib.contextmanager      # with上下文管理
    def worker_state(self, frelist, val):
        """
        用於記錄執行緒中正在等待的執行緒數
        """
        frelist.append(val)
        try:
            yield
        finally:
            frelist.remove(val)


def work(i):
    time.sleep(1)
    print(i)

pool = Threadpool()
for item in range(50):
    pool.run(func=work, args=(item,))
pool.close()
# pool.terminate()

在寫程式碼之前,我們先來看一下該怎麼設計這樣一個執行緒池,上面的執行緒池,我們的佇列中,存的是執行緒類,我們每處理一個任務都例項化一個執行緒,然後執行完了之後,該執行緒就被丟棄了,這樣有點不合適。我們這次設計的時候,

  1. 佇列中存的不是執行緒類,而是任務,我們從佇列中拿取的都是任務
  2. 每次執行任務的時候,不是都要生成一個執行緒,而是如果以前生成的執行緒有空閒的話,就用以前的執行緒
  3. 支援回掉機制,支援close,terminate

下面來一下程式碼是怎麼實現的

import threading
import queue
import time
import contextlib

class ThreadingPool:
    def __init__(self, num):
        self.max = num
        self.terminal = False
        self.q = queue.Queue()
        self.generate_list = []         # 儲存已經生成的執行緒
        self.free_list = []             # 儲存那些已經完成任務的執行緒

    def run(self, func, args=None, callbk=None):
        self.q.put((func, args, callbk))            # 將任務資訊作為一個元祖放到佇列中去
        if len(self.free_list) == 0 and len(self.generate_list) < self.max:
           self.threadstart()

    def threadstart(self):
        t = threading.Thread(target=self.handel)
        t.start()

    def handel(self):
        current_thread = threading.current_thread()
        self.generate_list.append(current_thread)
        event = self.q.get()
        while event != 'stop':
            func, args, callbk = event
            flag = True
            try:
                ret = func(*args)
            except Exception as e:
                flag = False
                ret = e

            if callbk is not None:
                try:
                    callbk(ret)
                except Exception as e:
                    pass

            if not self.terminal:
                with self.auto_append_remove(current_thread):
                    event = self.q.get()
            else:
                event = 'stop'
        else:
            self.generate_list.remove(current_thread)

    def terminate(self):
        self.terminal = True

        while self.generate_list:
            self.q.put('stop')
        self.q.empty()

    def close(self):
        num = len(self.generate_list)
        while num:
            self.q.put('stop')
            num -= 1

    @contextlib.contextmanager
    def auto_append_remove(self, thread):
        self.free_list.append(thread)
        try:
            yield
        finally:
            self.free_list.remove(thread)

def f(i):
    # time.sleep(1)
    return i

def f1(i):
    print(i)

p = ThreadingPool(5)
for i in range(20):
    p.run(func=f, args=(i,), callbk=f1)

p.close()

 

二、程序

 

執行緒的上一級就是程序,程序可包含很多執行緒,程序和執行緒的區別是程序間的資料不共享,多程序也可以用來處理多工,不過多程序很消耗資源,計算型的任務最好交給多程序來處理,IO密集型最好交給多執行緒來處理,此外程序的數量應該和cpu的核心說保持一致。

在windows中不能用fork來建立多程序,因此只能匯入multiprocessing,來模擬多程序,下面首先來看一下怎麼建立程序,大家可以先猜一下下面的結果是什麼

# 程序
import multiprocessing
l = []

def f(i):
    l.append(i)
    print('hi', l)

if __name__ == '__main__':
    for i in range(10):
        p = multiprocessing.Process(target=f, args=(i,))        # 資料不共享,建立10份 l列表
        p.start()

注意:由於程序之間的資料需要各自持有一份,所以建立程序需要的非常大的開銷。

資料共享

不同程序間記憶體是不共享的,要想實現兩個程序間的資料交換,可以用以下方法:

Shared memory

資料可以用Value或Array儲存在一個共享記憶體地圖裡,如下:

from multiprocessing import Process, Value, Array

def f(a, b):
    a.value = 3.111
    for i in range(len(b)):
        b[i] += 100


if __name__ == '__main__':
    num = Value('f', 3.333) # 類似C語言中的 浮點型數
    l = Array('i', range(10)) # 類似C語言中的整形陣列,長度為10
    print(num.value)
    print(l[:])

    p = Process(target=f, args=(num, l))
    p.start()
    p.join()
    print(num.value)# 大家自己執行一下,看下兩次列印結果是否一樣
    print(l[:])

'''
結果:
3.3329999446868896
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
3.1110000610351562
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]

'''

建立num和l 時,“d”和“i”引數由Array模組使用的typecodes建立:“d”表示一個雙精度的浮點數,“i”表示一個有符號的整數,這些共享物件將被執行緒安全的處理。

    'c': ctypes.c_char,  'u': ctypes.c_wchar,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double

Server process

由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array型別的支援。

from multiprocessing import Process, Manager
  
def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()
  
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))
  
        p = Process(target=f, args=(d, l))
        p.start()
        p.join()
  
        print(d)
        print(l)
 
# 輸出結果:
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Server process manager比 shared memory 更靈活,因為它可以支援任意的物件型別。另外,一個單獨的manager可以通過程序在網路上不同的計算機之間共享,不過他比shared memory要慢。

# manage.dict()共享資料
from multiprocessing import Process,Manager
  
manage = Manager()
dic = manage.dict()
  
def Foo(i):
    dic[i] = 100+i
    print (dic.values())
  
for i in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()
    p.join()

當建立程序時(非使用時),共享資料會被拿到子程序中,當程序中執行完畢後,再賦值給原值。

程序鎖例項

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Process, Array, RLock

def Foo(lock,temp,i):
    """
    將第0個數加100
    """
    lock.acquire()
    temp[0] = 100+i
    for item in temp:
        print ('%s----->%s'%(i,item))
    lock.release()

lock = RLock()
temp = Array('i', [11, 22, 33, 44])

for i in range(20):
    p = Process(target=Foo,args=(lock,temp,i,))
    p.start()

程序池

程序池內部維護一個程序序列,當使用時,則去程序池中獲取一個程序,如果程序池序列中沒有可供使用的進程序,那麼程式就會等待,直到程序池中有可用程序為止。

方法:

  • apply(func[, args[, kwds]]) :使用arg和kwds引數呼叫func函式,結果返回前會一直阻塞,由於這個原因,apply_async()更適合併發執行,另外,func函式僅被pool中的一個程序執行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個變體,會返回一個結果物件。如果callback被指定,那麼callback可以接收一個引數然後被呼叫,當結果準備好回撥時會呼叫callback,呼叫失敗時,則用error_callback替換callback。 Callbacks應被立即完成,否則處理結果的執行緒會被阻塞。

  • close() : 阻止更多的任務提交到pool,待任務完成後,工作程序會退出。

  • terminate() : 不管任務是否完成,立即停止工作程序。在對pool物件程序垃圾回收的時候,會立即呼叫terminate()。

  • join() : wait工作執行緒的退出,在呼叫join()前,必須呼叫close() or terminate()。這樣是因為被終止的程序需要被父程序呼叫wait(join等價與wait),否則程序會成為殭屍程序

程序池中有兩個方法:

  • apply
  • apply_async
from multiprocessing import Pool
import time

def myFun(i):
    time.sleep(2)
    print("mytfun",i)
    return i+100

def end_call(arg):
    print("end_call",arg)


if __name__ == "__main__":
    p = Pool(5)

    # print(p.map(myFun,range(10)))

    for i in range(10):
        p.apply_async(func=myFun,args=(i,),callback=end_call)

    print("end")
    p.close()
    p.join()

官方例程:

from multiprocessing import Pool, TimeoutError
import time
import os
 
def f(x):
    return x*x
 
if __name__ == '__main__':
    # 建立4個程序 
    with Pool(processes=4) as pool:
 
        # 列印 "[0, 1, 4,..., 81]" 
        print(pool.map(f, range(10)))
 
        # 使用任意順序輸出相同的數字, 
        for i in pool.imap_unordered(f, range(10)):
            print(i)
 
        # 非同步執行"f(20)" 
        res = pool.apply_async(f, (20,))      # 只執行一個程序 
        print(res.get(timeout=1))             # 輸出 "400" 
 
        # 非同步執行 "os.getpid()" 
        res = pool.apply_async(os.getpid, ()) # 只執行一個程序 
        print(res.get(timeout=1))             # 輸出程序的 PID 
 
        # 執行多個非同步執行可能會使用多個程序 
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])
 
        # 是一個程序睡10秒 
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("發現一個 multiprocessing.TimeoutError異常")
 
        print("目前,池中還有其他的工作")
 
    # 退出with塊中已經停止的池 
    print("Now the pool is closed and no longer available")

 

三、協程

執行緒和程序的操作是由程式觸發系統介面,最後的執行者是系統;協程的操作則是程式設計師。

協程存在的意義:對於多執行緒應用,CPU通過切片的方式來切換執行緒間的執行,執行緒切換時需要耗時(儲存狀態,下次繼續)。協程,則只使用一個執行緒,在一個執行緒中規定某個程式碼塊執行順序。

協程的適用場景:當程式中存在大量不需要CPU的操作時(IO),適用於協程;

# 安裝
pip install gevent
 
# 匯入模組
import gevent

greenlet

# greenlet
from greenlet import greenlet
 
def test1():
    print(11)
    gr2.switch()
    print(22)
    gr2.switch()
 
def test2():
    print(33)
    gr1.switch()
    print(44)
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

''' 
# 輸出結果:
11
33
22
44
'''

gevent

# gevent
import gevent
 
def foo():
    print("Running in foo")
    gevent.sleep(0)
    print("Explicit context switch to foo angin")
 
def bar():
    print("Explicit context to bar")
    gevent.sleep(0)
    print("Implicit context swich back to bar")
 
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])
 

'''
# 輸出結果:
Running in foo
Explicit context to bar
Explicit context switch to foo angin
Implicit context swich back to bar
'''
遇到IO自動切換
from gevent import monkey
monkey.patch_all()
import gevent
import requests

def f(url):
    print("FET: %s" % url)
    resp = requests.get(url)
    data = len(resp.text)
    print(url, data)

gevent.joinall([
    gevent.spawn(f, 'https://www.python.org/'),
    gevent.spawn(f, 'https://www.yahoo.com/'),
    gevent.spawn(f, 'https://github.com/'),
])

上面的例子,利用協程,一個執行緒完成所有的請求,發出請求的時候,不會等待回覆,而是一次性將所有的請求都發出求,收到一個回覆就處理一個回覆,這樣一個執行緒就解決了所有的事情,效率極高。