1. 程式人生 > >day21&22&23:線程、進程、協程

day21&22&23:線程、進程、協程

根據 res 簡單 pytho inf bubuko nat empty 計算機

1、程序工作原理

技術分享圖片

進程的限制:每一個時刻只能有一個線程來工作。
多進程的優點:同時利用多個cpu,能夠同時進行多個操作。缺點:對內存消耗比較高
當進程數多於cpu數量的時候會導致不能被調用,進程不是越多越好,cpu與進程數量相等最好
線程:java和C# 對於一個進程裏面的多個線程,cpu都在同一個時刻能使用。py同一時刻只能調用一個。
so:對於型的應用,py效率較java C#低。
多線程優點:共享進程的內存,可以創造並發操作。缺點:搶占資源,
多線程得時候系統在調用的時候需要記錄請求上下文的信息,請求上下文的切換 這個過程非常耗時。因此 線程不是越多越好,具體案例具體分析。
在計算機中,執行任務的最小單元就是線程
IO操作不利用CPU,IO密集型操作適合多線程,對於計算密集型適合多進程
GIL:全局解釋器鎖,PY特有它會在每個進程上加個鎖
系統存在進程和線程的目的是為了提高效率
1.1、單進程單線程
1.2、自定義線程:
主進程
主線程
子線程
2、線程鎖 threading.RLock和threading.Lock

多線程修改一個數據得時候可能會造成咱數據。建議使用rlock

3、線程時間:threading.Event: 通知
當有進程間的通訊的情況下這個才有應用場景。汽車類比線程,Event.wait()紅燈,Event.set()綠燈,Event.clear():使紅燈變綠

even是線程間的通訊機制。Event.wait([timeout]):賭賽線程,知道event對象內部標示位被設置為True或超時時間。Event.set():將標識位設為True。Event.clear():標識位設為False。Event.isSet():判斷標識位是否為True。

4、queue模塊:生產者-消費者模型

技術分享圖片
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading
# import queue

# q = queue.Queue(maxsize=0)  # 構造一個先進顯出隊列,maxsize指定隊列長度,為0 時,表示隊列長度無限制。
#
# q.join()    # 等到隊列為空的時候,在執行別的操作
# q.qsize()   # 返回隊列的大小 (不可靠)
# q.empty()   # 當隊列為空的時候,返回True 否則返回False (不可靠)
# q.full()    # 當隊列滿的時候,返回True,否則返回False (不可靠)
# q.put(item, block=True, timeout=None) # 將item放入Queue尾部,item必須存在,可以參數block默認為True,表示當隊列滿時,會等待隊列給出可用位置, # #                          為False時為非阻塞,此時如果隊列已滿,會引發queue.Full 異常。 可選參數timeout,表示 會阻塞設置的時間,過後, # #                          如果隊列無法給出放入item的位置,則引發 queue.Full 異常 # q.get(block=True, timeout=None) # 移除並返回隊列頭部的一個值,可選參數block默認為True,表示獲取值的時候,如果隊列為空,則阻塞,為False時,不阻塞, # #                       若此時隊列為空,則引發 queue.Empty異常。 可選參數timeout,表示會阻塞設置的時候,過後,如果隊列為空,則引發Empty異常。 # q.put_nowait(item) # 等效於 put(item,block=False) # q.get_nowait() # 等效於 get(item,block=False) message = queue.Queue(10) def producer(i): print("put:",i) # while True: message.put(i) def consumer(i): # while True: msg = message.get() print(msg) for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start() qs = message.qsize() print("當前消息隊列的長度為:%d"%(qs)) print("當前消息隊列的長度為:",qs)
queue示例代碼

join()方法主線程等待,最多等待時間可以hi設置,eg:t.join(2)

技術分享圖片
    import threading
    def f0():
        pass
    def f1(a1,a2):
        time.sleep(10)
        f0()
    t = threading.Thread(target=f1,args(111,222,))
    
    t.setDaemon(True)  #默認false 主線程將等待執行完成後結束,設置為true後主線程將不在等待
    t.start()
    t = threading.Thread(target=f1,args(111,222,))
    t.start()
    t = threading.Thread(target=f1,args(111,222,))
    t.start()
    t = threading.Thread(target=f1,args(111,222,))

t.start()
threading demo

5、進程 :multiprocess是py進程模塊

進程之間默認是隔離得,線程的資源默認是共享的

兩個進程共享數據需要使用特殊得對象: array:其他語音 或manager.dict()

進程不是,越多越好,建議使用線程池來控制。

技術分享圖片
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time
def myFun(i):
    time.sleep(2)
    return i+100

def end_call(arg):
    print("end_call",arg)


# print(p.map(myFun,range(10)))
if __name__ == "__main__":
    p = Pool(5)

    for i in range(10):
        p.apply_async(func=myFun,args=(i,),callback=end_call)

    print("end")
    p.close()
    p.join()
porcesspooldemo 技術分享圖片
#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import  Pool
import time

def f1(a):
    time.sleep(1)
    print(a)
    return 1000
def f2(arg):
    print(arg)

if __name__ =="__main__":
    pool = Pool(5)

    for i in range(50):
        pool.apply_async(func=f1, args=(i,),callback=f2)
        # pool.apply(func=f1, args=(i,))
        print(<<=================>>)
    pool.close()
    pool.join()
processpooldemo2

6、線程池py沒有提供,我們需要自己編寫

簡單線程池示例:

技術分享圖片
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading
import time

class ThreadPool(object):

    def __init__(self, max_num=20):
        self.queue = queue.Queue(max_num)
        for i in range(max_num):
            self.queue.put(threading.Thread)

    def get_thread(self):
        return self.queue.get()

    def add_thread(self):
        self.queue.put(threading.Thread)

def func(pool,a1):
    time.sleep(2)
    print(a1)
    pool.add_thread()

p = ThreadPool(10)

for i in range(100):
    #獲得類
    thread = p.get_thread()
    #對象 = 類()
    #
    t = thread(target=func,args=(p,i,))
    t.start()
"""
pool = ThreadPool(10)

def func(arg, p):
    print arg
    import time
    time.sleep(2)
    p.add_thread()

for i in xrange(30):
    thread = pool.get_thread()
    t = thread(target=func, args=(i, pool))
    t.start()
"""

# p = ThreadPool()
# ret = p.get_thread()
#
# t = ret(target=func,)
# t.start()
View Code

復雜的線城池示例:

技術分享圖片
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object()


class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
         # 多大容量
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        # 真實創建的線程列表
        self.generate_list = []
        # 空閑線程數量
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數)
        :return: 如果線程池已經終止,則返回True否則None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        創建一個線程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        # 獲取當前進程
        current_thread = threading.currentThread()
        self.generate_list.append(current_thread)

        # 取任務
        event = self.q.get()
        while event != StopEvent:
            # 是元組=》是任務
            # 解開任務包
            # 執行任務

            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            # 不是元組,不是任務
            # 標記:我空閑了
            # 執行後線程死掉
            self.generate_list.remove(current_thread)

    def close(self):
        """
        執行完所有的任務後,所有線程停止
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        無論是否還有任務,終止線程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.empty()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用於記錄線程中正在等待的線程數
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)



# How to use


pool = ThreadPool(5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass


def action(i):
    print(i)

for i in range(30):
    #將任務放在隊列
    #著手開始處理任務
    #創建線程(有空閑線程則不創建;不高於線程池的限制;根據任務個數判斷)  =》線程去隊列中去任務
    ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()
View Code

end

day21&22&23:線程、進程、協程