1. 程式人生 > >執行緒進階之執行緒佇列、執行緒池和協程

執行緒進階之執行緒佇列、執行緒池和協程

本節目錄:

1.執行緒佇列

2.執行緒池

3.協程

 

一、執行緒佇列

  執行緒之間的通訊我們列表行不行呢,當然行,那麼佇列和列表有什麼區別呢?

  queue佇列 :使用import queue,用法與程序Queue一樣

  queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

  class queue.Queue(maxsize=0) #先進先出
import queue #不需要通過threading模組裡面匯入,直接import queue就可以了,這是python自帶的
#用法基本和我們程序multiprocess中的queue是一樣的
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
# q.put_nowait() #沒有資料就報錯,可以通過try來搞
print(q.get())
print(q.get())
print(q.get())
# q.get_nowait() #沒有資料就報錯,可以通過try來搞
'''
結果(先進先出):
first
second
third
'''
先進先出示例程式碼

 

  class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue

q=queue.LifoQueue() #佇列,類似於棧,棧我們提過嗎,是不是先進後出的順序啊
q.put('first')
q.put('second')
q.put('third')
# q.put_nowait()

print(q.get())
print(q.get())
print(q.get())
# q.get_nowait()
'''
結果(後進先出):
third
second
first
'''
先進後出示例程式碼

 

  class queue.PriorityQueue(maxsize=0) #儲存資料時可設定優先順序的佇列

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高
q.put((-10,'a'))
q.put((-5,'a'))  #負數也可以
# q.put((20,'ws'))  #如果兩個值的優先順序一樣,那麼按照後面的值的acsii碼順序來排序,如果字串第一個數元素相同,比較第二個元素的acsii碼順序
# q.put((20,'wd'))
# q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() 不能是字典
# q.put((20,('w',1)))  #優先順序相同的兩個資料,他們後面的值必須是相同的資料型別才能比較,可以是元祖,也是通過元素的ascii碼順序來排序

q.put((20,'b'))
q.put((20,'a'))
q.put((0,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先順序越高,優先順序高的優先出隊):
'''
優先順序佇列示例程式碼

  這三種佇列都是執行緒安全的,不會出現多個執行緒搶佔同一個資源或資料的情況。

 

二、執行緒池

  Python標準模組——concurrent.futures

  到這裡就差我們的執行緒池沒有遇到了,我們用一個新的模組給大家講,早期的時候我們沒有執行緒池,現在python提供了一個新的標準或者說內建的模組,這個模組裡面提供了新的執行緒池和程序池,之前我們說的程序池是在multiprocessing裡面的,現在這個在這個新的模組裡面,他倆用法上是一樣的。

  為什麼要將程序池和執行緒池放到一起呢,是為了統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一樣,而且只要通過這個concurrent.futures匯入就可以直接用他們兩個了

concurrent.futures模組提供了高度封裝的非同步呼叫介面
ThreadPoolExecutor:執行緒池,提供非同步呼叫
ProcessPoolExecutor: 程序池,提供非同步呼叫
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
非同步提交任務

#map(func, *iterables, timeout=None, chunksize=1) 
取代for迴圈submit的操作

#shutdown(wait=True) 
相當於程序池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源後才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait引數為何值,整個程式都會等到所有任務執行完畢
submit和map必須在shutdown之前

#result(timeout=None)
取得結果

#add_done_callback(fn)
回撥函式

 

import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    print('%s列印的:'%(threading.get_ident()),n)
    return n*n
tpool = ThreadPoolExecutor(max_workers=5) #預設一般起執行緒的資料不超過CPU個數*5
# tpool = ProcessPoolExecutor(max_workers=5) #程序池的使用只需要將上面的ThreadPoolExecutor改為ProcessPoolExecutor就行了,其他都不用改
#非同步執行
t_lst = []
for i in range(5):
    t = tpool.submit(func,i) #提交執行函式,返回一個結果物件,i作為任務函式的引數 def submit(self, fn, *args, **kwargs):  可以傳任意形式的引數
    t_lst.append(t)  #
    # print(t.result())
    #這個返回的結果物件t,不能直接去拿結果,不然又變成串行了,可以理解為拿到一個號碼,等所有執行緒的結果都出來之後,我們再去通過結果物件t獲取結果
tpool.shutdown() #起到原來的close阻止新任務進來 + join的作用,等待所有的執行緒執行完畢
print('主執行緒')
for ti in t_lst:
    print('>>>>',ti.result())

# 我們還可以不用shutdown(),用下面這種方式
# while 1:
#     for n,ti in enumerate(t_lst):
#         print('>>>>', ti.result(),n)
#     time.sleep(2) #每個兩秒去去一次結果,哪個有結果了,就可以取出哪一個,想表達的意思就是說不用等到所有的結果都出來再去取,可以輪詢著去取結果,因為你的任務需要執行的時間很長,那麼你需要等很久才能拿到結果,通過這樣的方式可以將快速出來的結果先拿出來。如果有的結果物件裡面還沒有執行結果,那麼你什麼也取不到,這一點要注意,不是空的,是什麼也取不到,那怎麼判斷我已經取出了哪一個的結果,可以通過列舉enumerate來搞,記錄你是哪一個位置的結果物件的結果已經被取過了,取過的就不再取了

#結果分析: 列印的結果是沒有順序的,因為到了func函式中的sleep的時候執行緒會切換,誰先列印就沒準兒了,但是最後的我們通過結果物件取結果的時候拿到的是有序的,因為我們主執行緒進行for迴圈的時候,我們是按順序將結果物件新增到列表中的。
# 37220列印的: 0
# 32292列印的: 4
# 33444列印的: 1
# 30068列印的: 2
# 29884列印的: 3
# 主執行緒
# >>>> 0
# >>>> 1
# >>>> 4
# >>>> 9
# >>>> 16
ThreadPoolExecutor的簡單使用

  ThreadPoolExecutor的使用:

只需要將這一行程式碼改為下面這一行就可以了,其他的程式碼都不用變
tpool = ThreadPoolExecutor(max_workers=5) #預設一般起執行緒的資料不超過CPU個數*5
# tpool = ProcessPoolExecutor(max_workers=5)

你就會發現為什麼將執行緒池和程序池都放到這一個模組裡面了,用法一樣

 

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import threading
import os,time,random
def task(n):
    print('%s is runing' %threading.get_ident())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    s = executor.map(task,range(1,5)) #map取代了for+submit
    print([i for i in s])
map的使用

 

import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    return n*n

def call_back(m):
    print('結果為:%s'%(m.result()))

tpool = ThreadPoolExecutor(max_workers=5)
t_lst = []
for i in range(5):
    t = tpool.submit(func,i).add_done_callback(call_back)
回撥函式簡單應用

 

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<程序%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<程序%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future物件obj,需要用obj.result()拿到結果
回撥函式的應用

 

三、協程

  執行緒實現併發的最小單位

  併發:記錄狀態+切換


1.生成器版生成器(僅僅是模仿了下大概的思路,實質沒有節省資源)

import time

def f1():
    for i in range(10):
        time.sleep(0.5)
        print('f1>>',i)
        yield

def f2():
    g = f1()
    for i in range(10):
        time.sleep(0.5)
        print('f2>>', i)
        next(g)

f1()
f2()
生成器版協程

 

2.greenlet版協程

大概和生成器版差不多,兩個方法來回切換。偽協程!

import time
from greenlet import greenlet
def f1(s):
    print('第一次f1'+s)
    g2.switch('taibai')  #切換到g2這個物件的任務去執行
    time.sleep(1)
    print('第二次f1'+s)
    g2.switch()
def f2(s):
    print('第一次f2'+s)
    g1.switch()
    time.sleep(1)
    print('第二次f2'+s)
g1 = greenlet(f1)  #例項化一個greenlet物件,並將任務名稱作為引數參進去
g2 = greenlet(f2)
g1.switch('alex') #執行g1物件裡面的任務
greenlet版的協程

 

3.gevent版協程(真正的協程)

import gevent
import time

def f1():
    print("第一次f1")
    gevent.sleep(1)
    print("第二次f1")
    
def f2():
    print("第一次f2")
    gevent.sleep(2)
    print("第二次f2")
    
s = time.time()
g1 = gevent.spawn(f1) #非同步提交了f1任務
g2 = gevent.spawn(f2) #非同步提交了f2任務
g1.join()
g2.join()
e = time.time()
print("執行時間:",e-s)
print("主程式任務")
gevent版協程(不完美版)

大家會發現一個問題就是隻能使用gevent.sleep來代替time.sleep。還有就是要g1.join()和g2.join()有些麻煩對不對,下面就是協程gevent版的升級版。

import gevent
import time
from gevent import monkey;monkey.patch_all() #可以接收所有的I/O
def f1():
    print("第一次f1")
    time.sleep(1)
    print("第二次f1")

def f2():
    print("第一次f2")
    time.sleep(2)
    print("第二次f2")

s = time.time()
g1 = gevent.spawn(f1) #非同步提交了f1任務
g2 = gevent.spawn(f2) #非同步提交了f2任務

gevent.joinall([g1,g2]) #一個列表裡面是任務名等同於g1.join()和g2.join()
e = time.time()
print("執行時間:",e-s)
print("主程式任務")
gevent版協程(升級版)