1. 程式人生 > >python多線程學習一

python多線程學習一

finall == 函數 n) 方法 erp The 什麽 hat

本文希望達到的目標:

  1. 多線程的基本認識
  2. 多線程編程的模塊和類的使用
  3. Cpython的全局解釋器鎖GIL

一、多線程的基本認識

多線程編程的目的:並行處理子任務,大幅度地提升整個任務的效率。

同一個進程中的線程,共享相同的運行環境,共享同一片數據空間,所以線程間的通訊筆進程間的通信更簡單,但是這樣的共享是會有危險的,如果多線程共同訪問同一數據,因為訪問順序的不同,可能會導致結果不一致。

二、多線程編程的模塊和類的使用

為了更好說明多線程的優點以及多個標準庫使用的差異性,以模擬“一個程序完成2個獨立任務時間總和”為例子。

0、單進程單線程運行兩個獨立的任務:順序執行,完成第一個任務後,再完成第二個任務。總時間是各個循環 運行時間之和,實際兩個任務是完全獨立的,如果並行執行,是可以減少運行時間的。

import thread
from time import sleep,ctime

def loop0():
    print ‘start loop0‘,‘at:‘,ctime()
    sleep(4)
    print ‘loop0‘,‘done at:‘,ctime()

def loop1():
    print ‘start loop1‘,‘at:‘,ctime()
    sleep(3)
    print ‘loop1‘,‘done at:‘,ctime()

def main():
    print ‘starting at:‘,ctime()
    loop0()
    loop1() 
    print ‘all done at:‘,ctime()

if __name__==‘__main__‘:
    main()

技術分享圖片

1、thread模塊

python提供了兩個標準庫用於多線程編程,thread模塊提供了基本的線程和鎖的支持,而 threading 提供了更高級別,功能更強的線程管理的功能。一般都建議使用threading模塊,畢竟功能更強大,更好管理。

thread模塊和對象:(官網:https://docs.python.org/2/library/thread.html)

技術分享圖片

使用多線程編程,創建兩個線程同時執行兩個獨立的任務,需要考慮,主線程執行時間和子線程執行時間的關系,如果單純的創建線程去運行這2個任務,主線程執行完成時間必然比子線程快,子線程未運行完,主線程就已經退出了,在thread模塊使用鎖對象lock來管理,為每個線程創建一個鎖對象,在線程執行完成後釋放鎖,而主線程判斷所有的鎖都釋放後才能結束,進程間的通訊機制就這樣簡單的建立起來。

import thread
from time import sleep,ctime

loops =[4,3]

def loop(nloop,nsec,lock):
    print start loop,nloop,at:,ctime()
    sleep(nsec)
    print loop,nloop,done at:,ctime()
    lock.release()

def main():
    print starting at:,ctime()
    locks = []
    nloops = range(len(loops))

    for i in nloops:
        lock = thread.allocate_lock()
        lock.acquire()
        locks.append(lock)

    for i in nloops:
        thread.start_new_thread(loop,(i,loops[i],locks[i]))

    for i in nloops:
        print check lock
        while locks[i].locked():
            pass

    print all done at:,ctime()

if __name__==__main__:
    main()

技術分享圖片

運行時間為4s,單進程耗時7s,運行時間有減少。為什麽不在創建鎖的循環裏創建線程呢?有以下幾個原因:(1) 我 們想到實現線程的同步,所以要讓“所有的馬同時沖出柵欄”。(2) 獲取鎖要花一些時間,如果線程退出得“太快”,可能會導致還沒有獲得鎖,線程就已經結束了的情況。

註意:

A:子線程開始:創建對象調用start_new_thread函數時,該函數不是在主線程裏運行, 而是產生一個新的線程來運行這個函數。一旦調用該函數,子線程已經開始運行。

B:子線程退出:它不支持守護線程。當主線程退出時,所有的子線程不論它們是否還在工作,都會被強行退出。

2、threading模塊 :創建一個 Thread 的實例,傳給它一個函數

它不僅提供了 Thread 類,還提供了各 種非常好用的同步機制。

threading模塊和對象:(官網:https://docs.python.org/2/library/threading.html)

技術分享圖片

import threading
from time import sleep,ctime

loops =[4,3]

def loop(nloop,nsec):
    print start loop,nloop,at:,ctime()
    sleep(nsec)
    print loop,nloop,done at:,ctime()

def main():
    print starting at:,ctime()
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=loop,args=(i,loops[i]))
        threads.append(t)

    for i in nloops:
        print thread,i,start
        threads[i].start()

    for i in nloops:
        print thread,i,join
        threads[i].join()

    print all done at:,ctime()

if __name__==__main__:
    main()

技術分享圖片

註意:

A、子線程開始:調用start函數。所有的線程都創建了之後,再一起調用 start()函數啟動,而不是創建一個啟動一個。而且, 不用再管理一堆鎖(分配鎖,獲得鎖,釋放鎖,檢查鎖的狀態等)

B、子線程結束:可以控制子線程和主線程結束的順序,調用join(timeout=None) 程序掛起,直到線程結束;

C、守護線程一般是一個等待客戶請求的服務器, 如果沒有客戶提出請求,它就在那等著。如果設定一個線程為守護線程,就表示這個線程 是不重要的,在進程退出的時候,不用等待這個線程退出。

3、thread類 :Thread 派生出一個子類,創建一個這個子類的實例

import threading
from time import sleep,ctime

loops =(4,3)

class MyThread(threading.Thread):
    def __init__(self,func,args,name=‘‘):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        self.func(*self.args)

def loop(nloop,nsec):
    print start loop,nloop,at:,ctime()
    sleep(nsec)
    print loop,nloop,done at:,ctime()

def main():
    print starting at:,ctime()
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = MyThread(loop,(i,loops[i]),loop.__name__)
        threads.append(t)

    for i in nloops:
        print thread,i,start
        threads[i].start()
    for i in nloops:
        print thread,i,join
        threads[i].join()

    print all done at:,ctime()

if __name__==__main__:
    main()

4、threading模塊中的thread類部分源碼解析

thread模塊提供了一系列基礎函數,其實不是不能用,書本上寫著的是不建議使用,但是如果用於底層開發是可以的。threading模塊與之相比,最大的不同就是,threading模塊中的thread類的屬性特別多,

包含了對多線程的各自管理上的緯度屬性,所以特別方便使用,實際上threading模塊就是在thread模塊上開發的,做了進一步的集成化和封裝以便於用戶更輕便的管理。

A :threadding模塊有引用thread模塊:

try:
    import thread
except ImportError:
    del _sys.modules[__name__]
    raise

B: thread類的初始化函數部分截圖如下:初始化的過程,其實就是多線程的屬性的初始化的過程。把其中需要的資源,入參,thread管理的各自對象都初始化。

def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
      
        assert group is None, "group argument must be None for now"
        _Verbose.__init__(self, verbose)
        if kwargs is None:
            kwargs = {}
        self.__target = target
        self.__name = str(name or _newname())
        self.__args = args
        self.__kwargs = kwargs
        self.__daemonic = self._set_daemon()
        self.__ident = None
        self.__started = Event()
        self.__stopped = False
        self.__block = Condition(Lock())
        self.__initialized = True
        # sys.stderr is not stored in the class like
        # sys.exc_info since it can be changed between instances
        self.__stderr = _sys.stderr

C:thread類的start函數,看到調用底層的_start_new_thread函數,就明白了,為啥thread類是調用start函數來啟動線程,還調用了self.__started.wait(),__started對象實際是_Condition類的實例,這是一個對

線程鎖管理的實例,調用這個類的wait方法就是在獲取一把鎖。

 def start(self):
        if not self.__initialized:
            raise RuntimeError("thread.__init__() not called")
        if self.__started.is_set():
            raise RuntimeError("threads can only be started once")
        if __debug__:
            self._note("%s.start(): starting thread", self)
        with _active_limbo_lock:
            _limbo[self] = self
        try:
            _start_new_thread(self.__bootstrap, ())
        except Exception:
            with _active_limbo_lock:
                del _limbo[self]
            raise
        self.__started.wait()
 def wait(self, timeout=None):
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self.__waiters.append(waiter)
        saved_state = self._release_save()
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                if __debug__:
                    self._note("%s.wait(): got it", self)
            else:
                # Balancing act:  We cant afford a pure busy loop, so we
                # have to sleep; but if we sleep the whole timeout time,
                # well be unresponsive.  The scheme here sleeps very
                # little at first, longer as time goes on, but never longer
                # than 20 times per second (or the timeout time remaining).
                endtime = _time() + timeout
                delay = 0.0005 # 500 us -> initial delay of 1 ms
                while True:
                    gotit = waiter.acquire(0)
                    if gotit:
                        break
                    remaining = endtime - _time()
                    if remaining <= 0:
                        break
                    delay = min(delay * 2, remaining, .05)
                    _sleep(delay)
                if not gotit:
                    if __debug__:
                        self._note("%s.wait(%s): timed out", self, timeout)
                    try:
                        self.__waiters.remove(waiter)
                    except ValueError:
                        pass
                else:
                    if __debug__:
                        self._note("%s.wait(%s): got it", self, timeout)
        finally:
            self._acquire_restore(saved_state)

D:而調用join方法,實際也是調用_Condition類的實例,判斷當前鎖的狀態,在線程運行完畢後,釋放鎖。

    def join(self, timeout=None):
        if not self.__initialized:
            raise RuntimeError("Thread.__init__() not called")
        if not self.__started.is_set():
            raise RuntimeError("cannot join thread before it is started")
        if self is current_thread():
            raise RuntimeError("cannot join current thread")

        if __debug__:
            if not self.__stopped:
                self._note("%s.join(): waiting until thread stops", self)
        self.__block.acquire()
        try:
            if timeout is None:
                while not self.__stopped:
                    self.__block.wait()
                if __debug__:
                    self._note("%s.join(): thread stopped", self)
            else:
                deadline = _time() + timeout
                while not self.__stopped:
                    delay = deadline - _time()
                    if delay <= 0:
                        if __debug__:
                            self._note("%s.join(): timed out", self)
                        break
                    self.__block.wait(delay)
                else:
                    if __debug__:
                        self._note("%s.join(): thread stopped", self)
        finally:
            self.__block.release()

三、Cpython的全局解釋器鎖GIL

推薦一篇更全面介紹的博客:https://www.cnblogs.com/frchen/p/5740606.html

GIL全稱 Global Interpreter Lock,GIL 並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。實際現在一般使用的解析器都是基於CPython的,如果是Jpython(基於java),可能就不存在 這個問題。像單 CPU 的系統中運行多個進程那樣,內存中可以存放多個程序,但任意時刻,只有一個程序在 CPU 中運行。 在CPython 解釋器中可以“運行” 多個線程,但在任意時刻,只有一個線程在解釋器中運行。而對 Python 虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。

對所有面向 I/O 的(會調用內建的操作系統 C 代碼的)程序來說,GIL 會在這個 I/O 調用之前被釋放,以允許其它的線程在這個線程等待 I/O 的時候運行。如果某線程並未使用很多 I/O 操作, 它會在自己的時間片內一直占用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計算密集型的程序更能充分利用多線程環境的好處。

簡單的總結下就是:Python的多線程在多核CPU上,只對於IO密集型計算產生積極效果;而當有至少有一個CPU密集型線程存在,那麽多線程效率會由於GIL而大幅下降。

python多線程學習一