1. 程式人生 > >線程進階之線程隊列、線程池和協程

線程進階之線程隊列、線程池和協程

最小 for循環 display llb key adf func from 通信

本節目錄:

1.線程隊列

2.線程池

3.協程

一、線程隊列

  線程之間的通信我們列表行不行呢,當然行,那麽隊列和列表有什麽區別呢?

  queue隊列 :使用import queue,用法與進程Queue一樣

  queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

  class queue.Queue(maxsize=0) #先進先出
技術分享圖片
import queue #不需要通過threading模塊裏面導入,直接import queue就可以了,這是python自帶的
#用法基本和我們進程multiprocess中的queue是一樣的 q=queue.Queue() q.put(first) q.put(second) q.put(third) # q.put_nowait() #沒有數據就報錯,可以通過try來搞 print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() #沒有數據就報錯,可以通過try來搞 ‘‘‘ 結果(先進先出): first second third ‘‘‘
先進先出示例代碼

  class queue.LifoQueue(maxsize=0) #last in fisrt out

技術分享圖片
import queue

q=queue.LifoQueue() #隊列,類似於棧,棧我們提過嗎,是不是先進後出的順序啊
q.put(first)
q.put(second)
q.put(third)
# q.put_nowait()

print(q.get())
print(q.get())
print(q.get())
# q.get_nowait()
‘‘‘
結果(後進先出):
third
second
first
‘‘‘
先進後出示例代碼

  class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

技術分享圖片
import
queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高 q.put((-10,a)) q.put((-5,a)) #負數也可以 # q.put((20,‘ws‘)) #如果兩個值的優先級一樣,那麽按照後面的值的acsii碼順序來排序,如果字符串第一個數元素相同,比較第二個元素的acsii碼順序 # q.put((20,‘wd‘)) # q.put((20,{‘a‘:11})) #TypeError: unorderable types: dict() < dict() 不能是字典 # q.put((20,(‘w‘,1))) #優先級相同的兩個數據,他們後面的值必須是相同的數據類型才能比較,可以是元祖,也是通過元素的ascii碼順序來排序 q.put((20,b)) q.put((20,a)) q.put((0,b)) q.put((30,c)) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) ‘‘‘ 結果(數字越小優先級越高,優先級高的優先出隊): ‘‘‘
優先級隊列示例代碼

  這三種隊列都是線程安全的,不會出現多個線程搶占同一個資源或數據的情況。

二、線程池

  Python標準模塊——concurrent.futures

  到這裏就差我們的線程池沒有遇到了,我們用一個新的模塊給大家講,早期的時候我們沒有線程池,現在python提供了一個新的標準或者說內置的模塊,這個模塊裏面提供了新的線程池和進程池,之前我們說的進程池是在multiprocessing裏面的,現在這個在這個新的模塊裏面,他倆用法上是一樣的。

  為什麽要將進程池和線程池放到一起呢,是為了統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一樣,而且只要通過這個concurrent.futures導入就可以直接用他們兩個了

concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
異步提交任務

#map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操作

#shutdown(wait=True) 
相當於進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源後才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前

#result(timeout=None)
取得結果

#add_done_callback(fn)
回調函數

技術分享圖片
import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    print(%s打印的:%(threading.get_ident()),n)
    return n*n
tpool = ThreadPoolExecutor(max_workers=5) #默認一般起線程的數據不超過CPU個數*5
# tpool = ProcessPoolExecutor(max_workers=5) #進程池的使用只需要將上面的ThreadPoolExecutor改為ProcessPoolExecutor就行了,其他都不用改
#異步執行
t_lst = []
for i in range(5):
    t = tpool.submit(func,i) #提交執行函數,返回一個結果對象,i作為任務函數的參數 def submit(self, fn, *args, **kwargs):  可以傳任意形式的參數
    t_lst.append(t)  #
    # print(t.result())
    #這個返回的結果對象t,不能直接去拿結果,不然又變成串行了,可以理解為拿到一個號碼,等所有線程的結果都出來之後,我們再去通過結果對象t獲取結果
tpool.shutdown() #起到原來的close阻止新任務進來 + join的作用,等待所有的線程執行完畢
print(主線程)
for ti in t_lst:
    print(>>>>,ti.result())

# 我們還可以不用shutdown(),用下面這種方式
# while 1:
#     for n,ti in enumerate(t_lst):
#         print(‘>>>>‘, ti.result(),n)
#     time.sleep(2) #每個兩秒去去一次結果,哪個有結果了,就可以取出哪一個,想表達的意思就是說不用等到所有的結果都出來再去取,可以輪詢著去取結果,因為你的任務需要執行的時間很長,那麽你需要等很久才能拿到結果,通過這樣的方式可以將快速出來的結果先拿出來。如果有的結果對象裏面還沒有執行結果,那麽你什麽也取不到,這一點要註意,不是空的,是什麽也取不到,那怎麽判斷我已經取出了哪一個的結果,可以通過枚舉enumerate來搞,記錄你是哪一個位置的結果對象的結果已經被取過了,取過的就不再取了

#結果分析: 打印的結果是沒有順序的,因為到了func函數中的sleep的時候線程會切換,誰先打印就沒準兒了,但是最後的我們通過結果對象取結果的時候拿到的是有序的,因為我們主線程進行for循環的時候,我們是按順序將結果對象添加到列表中的。
# 37220打印的: 0
# 32292打印的: 4
# 33444打印的: 1
# 30068打印的: 2
# 29884打印的: 3
# 主線程
# >>>> 0
# >>>> 1
# >>>> 4
# >>>> 9
# >>>> 16
ThreadPoolExecutor的簡單使用

  ThreadPoolExecutor的使用:

只需要將這一行代碼改為下面這一行就可以了,其他的代碼都不用變
tpool = ThreadPoolExecutor(max_workers=5) #默認一般起線程的數據不超過CPU個數*5
# tpool = ProcessPoolExecutor(max_workers=5)

你就會發現為什麽將線程池和進程池都放到這一個模塊裏面了,用法一樣

技術分享圖片
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import threading
import os,time,random
def task(n):
    print(%s is runing %threading.get_ident())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == __main__:

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    s = executor.map(task,range(1,5)) #map取代了for+submit
    print([i for i in s])
map的使用

技術分享圖片
import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    return n*n

def call_back(m):
    print(結果為:%s%(m.result()))

tpool = ThreadPoolExecutor(max_workers=5)
t_lst = []
for i in range(5):
    t = tpool.submit(func,i).add_done_callback(call_back)
回調函數簡單應用

技術分享圖片
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print(<進程%s> get %s %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {url:url,text:respone.text}

def parse_page(res):
    res=res.result()
    print(<進程%s> parse %s %(os.getpid(),res[url]))
    parse_res=url:<%s> size:[%s]\n %(res[url],len(res[text]))
    with open(db.txt,a) as f:
        f.write(parse_res)


if __name__ == __main__:
    urls=[
        https://www.baidu.com,
        https://www.python.org,
        https://www.openstack.org,
        https://help.github.com/,
        http://www.sina.com.cn/
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果
回調函數的應用

三、協程

  線程實現並發的最小單位

  並發:記錄狀態+切換


1.生成器版生成器(僅僅是模仿了下大概的思路,實質沒有節省資源)

技術分享圖片
import time

def f1():
    for i in range(10):
        time.sleep(0.5)
        print(f1>>,i)
        yield

def f2():
    g = f1()
    for i in range(10):
        time.sleep(0.5)
        print(f2>>, i)
        next(g)

f1()
f2()
生成器版協程

2.greenlet版協程

大概和生成器版差不多,兩個方法來回切換。偽協程!

技術分享圖片
import time
from greenlet import greenlet
def f1(s):
    print(第一次f1+s)
    g2.switch(taibai)  #切換到g2這個對象的任務去執行
    time.sleep(1)
    print(第二次f1+s)
    g2.switch()
def f2(s):
    print(第一次f2+s)
    g1.switch()
    time.sleep(1)
    print(第二次f2+s)
g1 = greenlet(f1)  #實例化一個greenlet對象,並將任務名稱作為參數參進去
g2 = greenlet(f2)
g1.switch(alex) #執行g1對象裏面的任務
greenlet版的協程

3.gevent版協程(真正的協程)

技術分享圖片
import gevent
import time

def f1():
    print("第一次f1")
    gevent.sleep(1)
    print("第二次f1")
    
def f2():
    print("第一次f2")
    gevent.sleep(2)
    print("第二次f2")
    
s = time.time()
g1 = gevent.spawn(f1) #異步提交了f1任務
g2 = gevent.spawn(f2) #異步提交了f2任務
g1.join()
g2.join()
e = time.time()
print("執行時間:",e-s)
print("主程序任務")
gevent版協程(不完美版)

大家會發現一個問題就是只能使用gevent.sleep來代替time.sleep。還有就是要g1.join()和g2.join()有些麻煩對不對,下面就是協程gevent版的升級版。

技術分享圖片
import gevent
import time
from gevent import monkey;monkey.patch_all() #可以接收所有的I/O
def f1():
    print("第一次f1")
    time.sleep(1)
    print("第二次f1")

def f2():
    print("第一次f2")
    time.sleep(2)
    print("第二次f2")

s = time.time()
g1 = gevent.spawn(f1) #異步提交了f1任務
g2 = gevent.spawn(f2) #異步提交了f2任務

gevent.joinall([g1,g2]) #一個列表裏面是任務名等同於g1.join()和g2.join()
e = time.time()
print("執行時間:",e-s)
print("主程序任務")
gevent版協程(升級版)

線程進階之線程隊列、線程池和協程