1. 程式人生 > >Python基礎34_執行緒-條件,定時器,佇列,執行緒池, 協程

Python基礎34_執行緒-條件,定時器,佇列,執行緒池, 協程

 執行緒
一. 條件
    使得執行緒等待,只有滿足某條件時,才釋放n個執行緒
    import time
    from threading import Thread,RLock,Condition,current_thread
    
    def func1(c):
        c.acquire(False) #固定格式
        # print(1111)
    
        c.wait()  #等待通知,
        time.sleep(3)  #通知完成後大家是序列執行的,這也看出了鎖的機制了
        print('%s執行了'%(current_thread().getName()))
    
        c.release()
    
    if __name__ == '__main__':
        c = Condition()
        for i in range(5):
            t = Thread(target=func1,args=(c,))
            t.start()
    
        while True:
            num = int(input('請輸入你要通知的執行緒個數:'))
            c.acquire() #固定格式
            c.notify(num)  #通知num個執行緒別等待了,去執行吧
            c.release()
    
    #結果分析: 
    # 請輸入你要通知的執行緒個數:3
    # 請輸入你要通知的執行緒個數:Thread-1執行了 #有時候你會發現的你結果列印在了你要輸入內容的地方,這是列印的問題,沒關係,不影響
    # Thread-3執行了
    # Thread-2執行了
二. 定時器
    定時器,指定n秒後執行某個操作,這個做定時任務的時候可能會用到
    import time
    from threading import Timer,current_thread #這裡就不需要再引入Timer
    import threading
    def hello():
        print(current_thread().getName())
        print("hello, world")
        # time.sleep(3) #如果你的子執行緒的程式執行時間比較長,那麼這個定時任務也會亂,當然了,主要還是看業務需求
    t = Timer(10, hello)  #建立一個子執行緒去執行後面的函式
    t.start()  # after 1 seconds, "hello, world" will be printed
    # for i in range(5):
    #     t = Timer(2, hello)
    #     t.start()  
    #     time.sleep(3) #這個是建立一個t用的時間是2秒,創建出來第二個的時候,第一個已經過了兩秒了,所以你的5個t的執行結果基本上就是2秒中,這個延遲操作。
    
    print(threading.active_count())
    print('主程序',current_thread().getName())
三. 執行緒佇列
    queue佇列 :使用import queue,用法與程序Queue一樣
    queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
    1. class queue.Queue(maxsize=0)    先進先出
    import queue
    q = queue.Queue(3)
    q.put(1)
    q.put(2)
    print("當前佇列長度: ", q.qsize())
    q.put(3)
    print("檢視佇列狀態: ", q.full())
    try:
        q.put_nowait(4)
    except Exception:
        print("當前佇列已滿")
    print(q.get())
    print(q.get())
    print(q.get())
    print("檢視佇列狀態: ", q.empty())
    try:
        q.get_nowait()
    except Exception:
        print("佇列已空")
    2. class queue.LifoQueue(maxsize=0)    後進先出
    q = queue.LifoQueue(3)
    q.put(1)
    q.put(2)
    q.put(3)
    print("檢視佇列狀態: " ,q.full())
    print(q.get())
    print(q.get())
    print("檢視當前佇列長度: ", q.qsize())
    print(q.get())
    3. class queue.PriorityQueue(maxsize=0)    優先順序佇列
    def f1():
        pass
    class Animal:
        pass
    a = Animal()
    q = queue.PriorityQueue(3)
    q.put((1, Animal))
    q.put((3, f1))
    q.put((5, a))
    print(q.get())
    print(q.get())
    print(q.get())
    這三種佇列都是執行緒安全的,不會出現多個執行緒搶佔同一個資源或資料的情況。
四. 執行緒池 concurrent.futures 模組
    早期的時候我們沒有執行緒池,現在python提供了一個新的標準或者說內建的模組,這個模組裡面提供了新的執行緒池和程序池,之前我們說的程序池是在multiprocessing裡面的,現在這個在這個新的模組裡面,他倆用法上是一樣的。
    為什麼要將程序池和執行緒池放到一起呢,是為了統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一樣,而且只要通過這個concurrent.futures匯入就可以直接用他們兩個了:
    concurrent.futures模組提供了高度封裝的非同步呼叫介面
    ThreadPoolExecutor:執行緒池,提供非同步呼叫
    ProcessPoolExecutor: 程序池,提供非同步呼叫
    Both implement the same interface, which is defined by the abstract Executor class.
    1. 基本方法
    #submit(fn, *args, **kwargs)
    非同步提交任務
    #map(func, *iterables, timeout=None, chunksize=1) 
    取代for迴圈submit的操作
    #shutdown(wait=True) 
    相當於程序池的pool.close()+pool.join()操作
    wait=True,等待池內所有任務執行完畢回收完資源後才繼續
    wait=False,立即返回,並不會等待池內的任務執行完畢
    但不管wait引數為何值,整個程式都會等到所有任務執行完畢
    submit和map必須在shutdown之前
    #result(timeout=None)
    取得結果
    #add_done_callback(fn)
    回撥函式
    2. 執行緒池的簡單使用
    import time, os, threading
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    def func(n):
        time.sleep(1)
        print("%s列印的: %s"%(threading.get_ident(), n))
        return n*n
    # 預設一般起執行緒個數不超過CPU個數*5
    tpool = ThreadPoolExecutor(max_workers=5)
    # 非同步執行
    t_list = []
    for i in range(5):
        # 提交執行函式,返回一個結果物件,i作為任務函式的引數 def submit(self, fn, *args, **kwargs):可以傳任意形式的引數
        t = tpool.submit(func, i)
        t_list.append(t)
        # print(t.result())
        # 這個返回的結果物件t,不能直接去拿結果,不然又變成串行了,可以理解為拿到一個號碼,等所有執行緒的結果都出來之後,我們再去通過結果物件t獲取結果
    tpool.shutdown()
    # 起到原來的close阻止新任務進來 + join的作用,等待所有的執行緒執行完畢
    print("主執行緒")
    for ti in t_list:
        print(">>>", ti.result())
    # 我們還可以不用shutdown(),用下面這種方式
    # while 1:
    #     for n,ti in enumerate(t_list):
    #         print('>>>>', ti.result(),n)
    #     time.sleep(1)
        #每個兩秒去去一次結果,哪個有結果了,就可以取出哪一個,想表達的意思就是說不用等到所有的結果都出來再去取,可以輪詢著去取結果,因為你的任務需要執行的時間很長,那麼你需要等很久才能拿到結果,通過這樣的方式可以將快速出來的結果先拿出來。如果有的結果物件裡面還沒有執行結果,那麼你什麼也取不到,這一點要注意,不是空的,是什麼也取不到,那怎麼判斷我已經取出了哪一個的結果,可以通過列舉enumerate來搞,記錄你是哪一個位置的結果物件的結果已經被取過了,取過的就不再取了
    #結果分析: 列印的結果是沒有順序的,因為到了func函式中的sleep的時候執行緒會切換,誰先列印就沒準兒了,但是最後的我們通過結果物件取結果的時候拿到的是有序的,因為我們主執行緒進行for迴圈的時候,我們是按順序將結果物件新增到列表中的。
    # 6696列印的: 0
    # 5044列印的: 3
    # 4424列印的: 2
    # 8840列印的: 1
    # 1244列印的: 4
    # 主執行緒
    # >>> 0
    # >>> 1
    # >>> 4
    # >>> 9
    # >>> 16
    3. 執行緒池的簡單使用
    只需要將這一行程式碼改為下面這一行就可以了,其他的程式碼都不用變
    # tpool = ThreadPoolExecutor(max_workers=5) 
    tpool = ProcessPoolExecutor(max_workers=4)
    #預設一般起程序的資料不超過CPU個數
    4. map的使用
    import time, os, random, threading
    from concurrent.futures import ThreadPoolExecutor
    def work(n):
        print("%s is running"%threading.get_ident())
        time.sleep(random.randint(1,3))
        return n**2
    if __name__ == '__main__':
        t = ThreadPoolExecutor(max_workers=4)
        # for i in range(10):
        #     t.submit(work, i)
        # 用map取代submit
        s = t.map(work, range(5))
        print([i for i in s])
    # 5792 is running
    # 5824 is running
    # 9208 is running
    # 1296 is running
    # 5792 is running
    # [0, 1, 4, 9, 16]
    5. 回撥函式
五. 協成
    1. 背景
    對於單執行緒下,我們不可避免程式中出現io操作,但如果我們能在自己的程式中(即使用者程式級別,而非作業系統級別)控制單執行緒下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該執行緒能夠最大限度地處於就緒態,即隨時都可以被cpu執行的狀態,相當於我們在使用者程式級別將自己的io操作最大限度地隱藏起來,從而可以迷惑作業系統,讓其看到:該執行緒好像是一直在計算,io比較少,從而更多的將cpu的執行許可權分配給我們的執行緒。
    協程的本質就是在單執行緒下,由使用者自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。為了實現它,我們需要找尋一種可以同時滿足以下條件的解決方案:
    1). 可以控制多個任務之間的切換,切換之前將任務的狀態儲存下來,以便重新執行時,可以基於暫停的位置繼續執行。
    2). 作為1的補充:可以檢測io操作,在遇到io操作的情況下才發生切換
    2. 協成介紹
    協程:是單執行緒下的併發,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒,即協程是由使用者程式自己控制排程的。、
    需要強調的是:
    1). python的執行緒屬於核心級別的,即由作業系統控制排程(如單執行緒遇到io或執行時間過長就會被迫交出cpu執行許可權,切換其他執行緒執行)
    2). 單執行緒內開啟協程,一旦遇到io,就會從應用程式級別(而非作業系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
    對比作業系統控制執行緒的切換,使用者在單執行緒內控制協程的切換
    優點如下:
    1). 協程的切換開銷更小,屬於程式級別的切換,作業系統完全感知不到,因而更加輕量級
    2). 單執行緒內就可以實現併發的效果,最大限度地利用cpu
    缺點如下:
    1). 協程的本質是單執行緒下,無法利用多核,可以是一個程式開啟多個程序,每個程序內開啟多個執行緒,每個執行緒內開啟協程
    2). 協程指的是單個執行緒,因而一旦協程出現阻塞,將會阻塞整個執行緒
    總結協程特點:
    1). 必須在只有一個單執行緒裡實現併發
    2). 修改共享資料不需加鎖
    3). 使用者程式裡自己儲存多個控制流的上下文棧
    4). 附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模組(select機制))
    3. greenlet
    如果我們在單個執行緒內有20個任務,要想實現在多個任務之間切換,使用yield生成器的方式過於麻煩(需要先得到初始化一次的生成器,然後再呼叫send。。。非常麻煩),而使用greenlet模組可以非常簡單地實現這20個任務直接的切換
    #真正的協程模組就是使用greenlet完成的切換
    from greenlet import greenlet
    
    def eat(name):
        print('%s eat 1' %name)  #2
        g2.switch('taibai')   #3
        print('%s eat 2' %name) #6
        g2.switch() #7
    def play(name):
        print('%s play 1' %name) #4
        g1.switch()      #5
        print('%s play 2' %name) #8
    
    g1=greenlet(eat)
    g2=greenlet(play)
    
    g1.switch('taibai')#可以在第一次switch時傳入引數,以後都不需要  1
    #單純的切換(在沒有io的情況下或者沒有重複開闢記憶體空間的操作),反而會降低程式的執行速度
    #順序執行
    import time
    def f1():
        res=1
        for i in range(100000000):
            res+=i
    
    def f2():
        res=1
        for i in range(100000000):
            res*=i
    
    start=time.time()
    f1()
    f2()
    stop=time.time()
    print('run time is %s' %(stop-start)) #10.985628366470337
    
    #切換
    from greenlet import greenlet
    import time
    def f1():
        res=1
        for i in range(100000000):
            res+=i
            g2.switch()
    
    def f2():
        res=1
        for i in range(100000000):
            res*=i
            g1.switch()
    
    start=time.time()
    g1=greenlet(f1)
    g2=greenlet(f2)
    g1.switch()
    stop=time.time()
    print('run time is %s' %(stop-start)) # 52.763017892837524
    greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。
    雖然沒有規避固有的I/O時間,但是我們使用這個時間來做別的事情了,一般在工作中我們都是程序+執行緒+協程的方式來實現併發,以達到最好的併發效果,如果是4核的cpu,一般起5個程序,每個程序中20個執行緒(5倍cpu數量),每個執行緒可以起500個協程,大規模爬取頁面的時候,等待網路延遲的時間的時候,我們就可以用協程去實現併發。 併發數量 = 5 * 20 * 500 = 50000個併發,這是一般一個4cpu的機器最大的併發數。nginx在負載均衡的時候最大承載量就是5w個
    單執行緒裡的這20個任務的程式碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提高效率,這就用到了Gevent模組。
    4. gevent模組
    Gevent 是一個第三方庫,可以輕鬆通過gevent實現併發同步或非同步程式設計,在gevent中用到的主要模式是Greenlet, 它是以C擴充套件模組形式接入Python的輕量級協程。 Greenlet全部執行在主程式作業系統程序的內部,但它們被協作式地排程。
    1). 用法
    g1=gevent.spawn(func,1,2,3,x=4,y=5)建立一個協程物件g1,spawn括號內第一個引數是函式名,如eat,後面可以有多個引數,可以是位置實參或關鍵字實參,都是傳給函式eat的,spawn是非同步提交任務
    g2=gevent.spawn(func2)
    g1.join() #等待g1結束,上面只是建立協程物件,這個join才是去執行
    g2.join() #等待g2結束  有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,但是你會發現,如果g2裡面的任務執行的時間長,但是不寫join的話,就不會執行完等到g2剩下的任務了
    #或者上述兩步合作一步:gevent.joinall([g1,g2])
    g1.value#拿到func1的返回值
    2). 遇到I/O阻塞會自動切換任務
    import gevent

    def eat(name):
        print("%s eat 1"% name)
        gevent.sleep(2)
        print("%s eat 2"% name)
    
    def play(name):
        print("%s play 1" % name)
        gevent.sleep(1)
        print("%s play 2" % name)
    
    g1 = gevent.spawn(eat, "egon")
    g2 = gevent.spawn(play, "egon")
    
    g1.join()
    g2.join()
    # gevent.spawn([g1, g2])
    
    print("主")
    # egon eat 1
    # egon play 1
    # egon play 2
    # egon eat 2
    # 主
    上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,
    而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行程式碼,打補丁,就可以識別了
    from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模組之前
    或者我們乾脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到檔案的開頭

    我們可以用threading.current_thread().getName()來檢視每個g1和g2,檢視的結果為DummyThread-n,即假執行緒,虛擬執行緒,其實都在一個執行緒裡面
    程序執行緒的任務切換是由作業系統自行切換的,你自己不能控制
    協程是通過自己的程式(程式碼)來進行切換的,自己能夠控制,只有遇到協程模組能夠識別的IO操作的時候,程式才會進行任務切換,實現併發效果,如果所有程式都沒有IO操作,那麼就基本屬於序列執行了。
    5. 協成子同步與非同步
    from gevent import spawn, joinall,monkey; monkey.patch_all()
    import time
    
    def task(pid):
        """
           Some non-deterministic task
        """
        time.sleep(0.5)
        print('Task %s done' % pid)
    
    # 同步
    def synchronous():
        for i in range(10):
            task(i)
    
    # spawn()非同步提交任務
    def asynchronous():
        g_l = [spawn(task, i) for i in range(10)]
        joinall(g_l)
    
    
    if __name__ == '__main__':
        print("Synchronous:")
        synchronous()
    
        print("Asynchronous:")
        asynchronous()
        # 上面程式的重要部分是將task函式封裝到Greenlet內部執行緒的gevent.spawn。 初始化的greenlet列表存放在陣列threads中,此陣列被傳給gevent.joinall 函式,後者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完後才會繼續向下走。
    6. gevent應用之一: 爬蟲
    from gevent import monkey; monkey.patch_all()
    import gevent
    import requests
    import time
    
    def get_page(url):
        print("GET: %s"%url)
        response = requests.get(url)
        # print(response.status_code)
        if response.status_code == 200:
            print("%d bytes receved from %s"%(len(response.text), url))
            # print(response.text)
    
    s = time.time()
    gevent.joinall([
        gevent.spawn(get_page, 'https://www.python.org/'),
        gevent.spawn(get_page, 'https://www.yahoo.com/'),
        gevent.spawn(get_page, 'https://github.com/'),
    ])
    
    e = time.time()
    print("run time is %s" % (e-s))
    
    # GET: https://www.python.org/
    # GET: https://www.yahoo.com/
    # GET: https://github.com/
    # 48862 bytes receved from https://www.python.org/
    # 79878 bytes receved from https://github.com/
    # 518555 bytes receved from https://www.yahoo.com/
    # run time is 10.245655298233032
    
    # 將上面的程式最後加上一段序列的程式碼看看效率:如果你的程式不需要太高的效率,那就不用什麼併發啊協程啊之類的東西。
    
    print('--------------------------------')
    s = time.time()
    requests.get('https://www.python.org/')
    requests.get('https://www.yahoo.com/')
    requests.get('https://github.com/')
    t = time.time()
    print('序列時間>>',t-s)
    
    # --------------------------------
    # 序列時間>> 13.477648973464966
    7. gevent應用之二: 實現單執行緒下的socket併發
    通過gevent實現單執行緒下的socket併發(from gevent import monkey;monkey.patch_all()一定要放到匯入socket模組之前,否則gevent無法識別socket的阻塞)
    一個網路請求裡面經過多個時間延遲time

    1). 服務端:
    from gevent import monkey; monkey.patch_all()
    from socket import *
    import gevent
    
    #如果不想用money.patch_all()打補丁,可以用gevent自帶的socket
    # from gevent import socket
    # s=socket.socket()
    
    def server(server_ip, port):
        s = socket(AF_INET, SOCK_STREAM)
        s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        s.bind((server_ip, port))
        s.listen(5)
        while 1:
            conn, addr = s.accept()
            gevent.spawn(talk, conn, addr)
    
    def talk(conn, addr):
        try:
            while 1:
                res = conn.recv(1024).decode("utf-8")
                print("client %s:%s msg: %s"%(addr[0], addr[1], res))
                # msg = input(">>>: ").strip()
                msg = res.upper()
                conn.send(msg.encode("utf-8"))
        except Exception as e:
            print(e)
        finally:
            conn.close()
    
    if __name__ == '__main__':
        server("127.0.0.1", 8080)

    2). 客戶端
    from socket import *

    c = socket(AF_INET, SOCK_STREAM)
    c.connect(("127.0.0.1", 8080))
    
    while 1:
        msg = input(">>>: ").strip()
        if not msg:
            continue
    
        c.send(msg.encode("utf-8"))
        msg2 = c.recv(1024)
    
        print("客戶端: ", msg2.decode("utf-8"))

    3). 多執行緒併發多個客戶端,去請求上面的服務端是沒問題的
    from threading import Thread
    from socket import *
    import threading
    
    def client(server_ip, port):
        c = socket(AF_INET, SOCK_STREAM)
        c.connect((server_ip, port))
        count = 0
        while 1:
            c.send(("%s say hello %s" % (threading.current_thread().getName(), count)).encode("utf-8"))
            msg = c.recv(1024)
            print("server: ", msg.decode("utf-8"))
            count += 1
    
    if __name__ == '__main__':
        for i in range(500):
            t = Thread(target=client, args=("127.0.0.1", 8080))
            t.start()