1. 程式人生 > >Python 線程----線程方法,線程事件,線程隊列,線程池,GIL鎖,協程,Greenlet

Python 線程----線程方法,線程事件,線程隊列,線程池,GIL鎖,協程,Greenlet

opensta 回收 chunks www 不能 第一個元素 連接 none 恢復

主要內容:

  線程的一些其他方法

  線程事件

  線程隊列

  線程池

  GIL鎖

  協程

  Greenlet

  Gevent

一. 線程(threading)的一些其他方法

技術分享圖片
from threading import Thread
import threading
import time

def work():
    time.sleep(1)
    print("子線程對象>>>", threading.current_thread())               # 子線程對象
    print("子線程名稱>>>", threading.current_thread().getName())     #
子線程名稱 print("子線程ID>>>", threading.get_ident()) # 子線程ID if __name__ == __main__: t = Thread(target=work) # 創建子線程 t.start() # 開啟子線程 print("主線程對象>>>", threading.current_thread()) # 主線程對象 print("主線程名稱>>>
", threading.current_thread().getName()) # 主線程名稱 print("主線程ID>>>", threading.current_thread().ident) # 主線程ID print("主線程ID>>>", threading.get_ident()) # 主線程ID time.sleep(1) # 阻塞住,此時主線程代碼運行的同時子線程代碼也在運行 print(threading.enumerate()) #
拿到所有正在運行的線程對象(包括主線程) print(threading.active_count()) # 拿到所有正在運行的線程對象的數量 print("主線程/主進程執行完畢")
threading的一些其他方法

二. 線程事件

同進程的一樣. 線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。在初始情況下,Event對象中的信號標誌被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麽這個線程將會被一直阻塞直至該標誌為真。一個線程如果將一個Event對象的信號標誌設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那麽它將忽略這個事件, 繼續執行.

  事件的基本方法:

event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;
event.clear():恢復event的狀態值為False。

  舉例說明:

有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,如果連接不成功,都會去嘗試重新連接。我們現在采用threading.Event機制來協調各個工作線程的連接操作.

MySQL簡述:

mysql就是一個數據庫,存數據用的東西,它就像一個文件夾,裏面存著很多的excel表格,我們可以在表格裏面寫數據,存數據。但是如果我們要使用數據庫,我們必須先要去連接它,你和他建立了連接關系,你才能操作它裏面存放的數據。

模擬一個場景,開啟兩個線程:

  線程一: 連接數據庫,這個線程需要等待一個信號,告訴我們雙方之間的網絡是可以連通的.

  線程二:檢測與數據庫之間的網絡是否聯通,並發送一個可聯通或者不可聯通的信號.

技術分享圖片
from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError(鏈接超時) #自己發起錯誤
        print(<%s>第%s次嘗試鏈接 % (threading.current_thread().getName(), count))
        event.wait(0.5) #
        count+=1
    print(<%s>鏈接成功 %threading.current_thread().getName())


def check_mysql():
    print(\033[45m[%s]正在檢查mysql\033[0m % threading.current_thread().getName())
    t1 = random.randint(0,3)
    print(>>>>,t1)
    time.sleep(t1)
    event.set()
if __name__ == __main__:
    event=Event()
    check = Thread(target=check_mysql)
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)

    check.start()
    conn1.start()
    conn2.start()
模擬連接的代碼示例

三. 線程隊列

queue隊列: 使用import queue , 用法與進程Queue一樣.

  queue.Queue(maxsize=0) -- 先進先出

技術分享圖片
import queue #不需要通過threading模塊裏面導入,直接import queue就可以了,這是python自帶的,用法基本和我們進程multiprocess中的queue是一樣的

q=queue.Queue()
q.put(first)
q.put(second)
q.put(third)
# q.put_nowait() #沒有數據就報錯,可以通過try來搞

print(q.get())
print(q.get())
print(q.get())
# q.get_nowait() #沒有數據就報錯,可以通過try來搞

# 執行結果: (先進先出)
# first
# second
# third
先進先出示例代碼

  queue.LifoQueue(maxsize=0) -- last in first out 後進先出

技術分享圖片
import queue

q=queue.LifoQueue() #隊列,類似於棧,後進先出的順序
q.put(first)
q.put(second)
q.put(third)
# q.put_nowait()

print(q.get())
print(q.get())
print(q.get())
# q.get_nowait()

# 執行結果:(後進先出)
# third
# second
# first
後進先出示例代碼

  queue.PriorityQueue(maxsize=0) -- 存儲數據時可以設置優先級隊列

技術分享圖片
rt queue

q = queue.PriorityQueue()   # 創建棧

# put()方法放置一個元組進入棧,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高
q.put((-10, "a"))
q.put((-5, "a"))  # 負數也可以
# q.put((20,"ws"))  # 如果兩個值的優先級一樣,那麽按照後面的值的acsii碼順序來排序,如果字符串第一個數元素相同,比較第二個元素的acsii碼順序
# q.put((20,"wd"))
# q.put((20,{"a": 11})) # TypeError: unorderable types: dict() < dict() 不能是字典
# q.put((20,("w", 1)))  # 優先級相同的兩個數據,他們後面的值必須是相同的數據類型才能比較,可以是元祖,也是通過元素的ascii碼順序來排序

q.put((20, "b"))
q.put((20, "a"))
q.put((0, "b"))
q.put((30, "c"))

print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())

# 數字越小優先級越高,優先級高的優先出隊
優先級隊列示例代碼

四. 線程池

早期的時候並沒有線程池,現在Python提供了一個新的標準或者說內置的模塊,這個模塊裏面提供了新的線程池和進程池.

  模塊介紹:

# concurrent.futures模塊提供了高度封裝的異步調用接口
# ThreadPoolExecutor: 線程池,提供異步調用
# ProcessPoolExecutor: 進程池,提供異步調用

# 基本方法:
# submit(func, *args, **kwargs) -- 異步提交任務

# map(func, *iterables, timeout=None, chunksize=1) -- 取代for循環submit的操作

# shutdown(wait=True) -- 相當於進程池的pool.close()和pool.join()操作
需要註意的是:
wait=True,等待池內所有任務執行完畢回收完資源後才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前

# result(timeout=None) -- 取得結果,相當於get(),如果沒有值可取會阻塞住

# add_done_callback(func) -- 回調函數

  ThreadPoolExecutor的簡單使用

技術分享圖片
import time
from concurrent.futures import ThreadPoolExecutor

def func(n):
    time.sleep(2)
    return n**2

if __name__ == __main__:
    thread_pool = ThreadPoolExecutor(max_workers=4) # 創建線程池對象,默認一般開啟的線程數量不超過CPU個數的5倍
    t_list = []
    for i in range(20):
        t = thread_pool.submit(func, i) # 提交執行函數,返回一個結果對象,i作為任務函數的參數.submit(func, *args, **kwargs)可以傳遞任意形式的參數
        t_list.append(t)

    # thread_pool.shutdown()  # shutdown()的作用相當於close()和join(),等待所有的線程執行完畢
    # for tt in t_list:
    #     print(">>>", tt.result())

    # 也可以不用shutdown()方法,改換下面這種方式:
    for n, tt in enumerate(t_list):   # enumerate()枚舉
        print(">>>", n, tt.result())
    time.sleep(2)
ThreadPoolExecutor的簡單使用

  補充: ProcessPoolExecutor的使用

只需要將這一行代碼改為下面這一行就可以了,其他的代碼都不用變
tpool = ThreadPoolExecutor(max_workers=5) #默認一般起線程的數據不超過CPU個數的5倍
# tpool = ProcessPoolExecutor(max_workers=5)

你就會發現為什麽將線程池和進程池都放到這一個模塊裏面了,用法一樣

技術分享圖片
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import threading
import time, random

def task(n):
    print("{} is runing".format(threading.get_ident())) # 子線程ID號
    time.sleep(random.randint(1, 3))
    return n**2

if __name__ == __main__:
    executor = ThreadPoolExecutor(max_workers=3)    # 創建線程池對象,設置的線程數量為3
    for i in range(11):
        future = executor.submit(task, i)
    s = executor.map(task, range(1, 5)) # map()取代了 for循環+submit()
    print([i for i in s])
map的使用 技術分享圖片
import time
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    return n*n

def call_back(m):
    print("結果為:{}".format(m.result()))

if __name__ == __main__:
    tpool = ThreadPoolExecutor(max_workers=5)   # 創建進程池對象
    t_list = []
    for i in range(5):
        t = tpool.submit(func, i).add_done_callback(call_back)
回調函數的簡單應用 技術分享圖片
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print(<進程%s> get %s %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {url:url,text:respone.text}

def parse_page(res):
    res=res.result()
    print(<進程%s> parse %s %(os.getpid(),res[url]))
    parse_res=url:<%s> size:[%s]\n %(res[url],len(res[text]))
    with open(db.txt,a) as f:
        f.write(parse_res)


if __name__ == __main__:
    urls=[
        https://www.baidu.com,
        https://www.python.org,
        https://www.openstack.org,
        https://help.github.com/,
        http://www.sina.com.cn/
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果
回調函數的應用

五. GIL鎖

參考資料

Python 線程----線程方法,線程事件,線程隊列,線程池,GIL鎖,協程,Greenlet