1. 程式人生 > >Python多線程編程

Python多線程編程

多線程、thread、生產者/消費者問題

一個串行程序需要從每個I/O終端通道來檢測用戶的輸入,然而程序在讀取過程中不能阻塞,因為用戶輸入的到達時間的不確定,並且阻塞會妨礙其他I/O通道的處理。由於串行程序只有唯一的執行線程,因此它需要兼顧執行的多個任務,確保其中的某個任務不會占用過多的時間,並對用戶的響應時間進行合理的分配。這種任務類型的串行程序的使用,往往造成非常復雜的控制流,難以維護。

多線程編程的本質就是異步,需要多個並發活動,每個活動的處理順序不確定,或者說隨機的。這種編程任務可以被組織或劃分成多個執行流,其中每個執行流都有一個指定要完成的任務。根據應用的不同,這些子任務可能需要計算出中間結果,然後合並為最終的輸出。使用多線程編程,以及類似的Queue的共享數據結構,這個編程任務可以規劃成幾個特定函數的線程。使用多線程編程來規劃這種編程任務可以降低程序的復雜度,使其實現更加清晰、高校,簡潔。

  • 進程與線程

計算機程序只是存儲在磁盤上的可執行二進制文件。只有把它們加載到內存中並操作系統調用,才能擁有其生命周期。進程則是一個執行中的程序。每個進程都有自己的地址空間、內存、數據棧以及其他用於跟蹤執行的輔助數據。進程可以通過派生(fork或spawn)新的進程來執行其他任務,但是因為每個新進程也擁有自己的內存和數據棧等,所以只能采用進程間通信(IPC)的方式共享信息。

線程與進程類似,不過它們是在同一進程下執行的,並共享相同的上下文。一個進程中的各個線程與主線程共享同一片數據空間,因此相比於獨立的進程而言,線程間的共享和通信更加容易。線程一般以並行方式執行,正是由於這種並行和數據共享,是的多任務的協作成為可能。但是線程建的數據共享可能引起多個線程訪問同一片數據造成競態條件,而且多個線程無法給與公平的執行時間。

  • 全局解釋鎖

Python的代碼執行是由Python虛擬機(解釋器主循環)進行控制。在主循環中同時只有一個控制線程在執行,就像單核CPU系統中的多線程一樣。內存中可以有許多程序,但是任意給定的時刻只能有一個程序在運行。同理,盡管Python解釋其中可以運行多個線程,但任意時刻只有一個線程會被解釋器執行。

對Python虛擬機的訪問是由全局解釋鎖(GIL)控制的。這個鎖就是用來保證同時只能有一個線程運行。在多線程環境中,Python虛擬機將按照以下方式執行:

1.設置GIL

2.切換到一個線程去運行

3.運行:
a. 指定數量的字節碼指令

b. 線程主動讓出控制(調用time.sleep(0))

4.把線程設置為睡眠狀態

5.解鎖GIL
6.再次重復以上所有步驟

  • Python中的threading模塊

Python提供了多個模塊來支持來支持多線程編程,包括thread、threading和Queue模塊等。然而建議避免使用thread模塊,threading模塊更加先進,有更好的線程支持,並且thread模塊中一些屬性會和threading沖突,另外低級別的thread模塊擁有的同步原語和很少。更重要的是,在Python3中已經沒有thread模塊。


thread模塊和鎖對象

thread模塊的函數

start_new_thread(function, args[, kwargs])派生一個新的線程
allocate_lock()分配LockType鎖對象
exit()線程退出

LockType鎖對象的方法

acquire(wait=None)嘗試獲取鎖對象
locked()如果獲取鎖對象返回True,否則False
release()釋放鎖

threading模塊

threading模塊的對象


對象描述
Thread一個線程的執行對象
Lock鎖原語對象
RLock可重入鎖對象,使單一線程(再次)獲得已持有的鎖(遞歸鎖)
Condition條件變量對象,使得一個線程等待另一個線程滿足特定的條件
Event
條件變量的通用版本,任意數量的線程等待某個事件的發生,該事件發生後所有線程將激活
Semaphore

為線程間共享的有限資源提供‘計數器‘,如果沒有可用資源時會被阻塞

BoundedSemaphore
與Semaphore相似,但它不允許超過初始值
Timer
與Thread相似,在運行前需要等待一段時間
Barrier
創建一個‘障礙‘,必須達到指定數量線程後才可以繼續

Thread對象數據屬性

name線程名
ident線程標識符
daemon布爾標誌,表示這個線程是否是守護線程

Thread對象方法

__init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)實例化一個線程
start(self)開始執行該線程
run(self)定義線程功能的方法
join(self, timeout=None)直至啟動的線程終止之前一直掛起,除非給出了timeout,否則一直阻塞
getName(self)返回線程名
setName(self, name)設定線程名
isAlive()/is_alive(self)布爾標誌,表示線程是否還存活
isDaemon(self)如果是守護線程,返回True
setDaemon(self, daemonic)把線程的守護標誌設定為deamonic(必須在線程start()之前調用)

使用Thread類,可以有很多方法創建線程,有三種常用的方法:

創建Thread的實例,傳給它一個函數

創建Thread的實例,傳給它一個可調用的類實例

派生Thread的子類,並創建子類的實例

創建Thread的實例,傳給它一個函數:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
‘‘‘
join()方法,可以等待所有線程執行完畢
‘‘‘
import threading
from time import sleep,ctime
loops = [4,2]
def loop(nloop,nsec):
    print(‘start loop‘,nloop,‘at:‘,ctime())
    sleep(nsec)
    print(‘loop‘,nloop,‘done at:‘,ctime())
def main():
    print(‘starting at:‘,ctime())
    threads = []
    nloops = range(len(loops))
    for i in nloops:
        t = threading.Thread(target=loop,args=(i,loops[i]))
        threads.append(t)
    for i in nloops:
        threads[i].start()
    for i in nloops:
        threads[i].join()
    print(‘all DONE at:‘,ctime())
if __name__ == ‘__main__‘:
    main()

運行結果:
starting at: Wed May 10 12:30:28 2017
start loop 0 at: Wed May 10 12:30:28 2017
start loop 1 at: Wed May 10 12:30:28 2017
loop 1 done at: Wed May 10 12:30:30 2017
loop 0 done at: Wed May 10 12:30:32 2017
all DONE at: Wed May 10 12:30:32 2017

創建Thread的實例,傳給它一個可調用的類實例:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
from time import sleep,ctime
loops = [4,2]
class ThreadFunc(object):
    def __init__(self,func,args,name=‘‘):
        self.name = name
        self.func = func
        self.args = args
    def __call__(self):
        self.func(*self.args)
def loop(nloop,nsec):
    print(‘start loop‘,nloop,‘at:‘,ctime())
    sleep(nsec)
    print(‘loop‘,nloop,‘done at:‘,ctime())
def main():
    print(‘starting at:‘,ctime())
    threads = []
    nloops = range(len(loops))
    for i in nloops:
        t = threading.Thread(target=ThreadFunc(loop,(i,loops[i]),loop.__name__))
        threads.append(t)
    for i in nloops:
        threads[i].start()
    for i in nloops:
        threads[i].join()
    print(‘all DONE at:‘,ctime())
if __name__ == ‘__main__‘:
    main()
運行結果:
starting at: Wed May 10 12:31:23 2017
start loop 0 at: Wed May 10 12:31:23 2017
start loop 1 at: Wed May 10 12:31:23 2017
loop 1 done at: Wed May 10 12:31:25 2017
loop 0 done at: Wed May 10 12:31:27 2017
all DONE at: Wed May 10 12:31:27 2017

派生Thread的子類,並創建子類的實例

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
from time import sleep,ctime

loops = (4,2)
class MyThread(threading.Thread):
    def __init__(self,func,args,name=‘‘):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args
    def run(self):
        self.func(*self.args)
def loop(nloop,nsec):
    print(‘start loop‘,nloop,‘at:‘,ctime())
    sleep(nsec)
    print(‘loop‘,nloop,‘done at:‘,ctime())

def main():
    print(‘starting at:‘,ctime())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = MyThread(loop,(i,loops[i]),loop.__name__)
        threads.append(t)
    for i in nloops:
        threads[i].start()
    for i in nloops:
        threads[i].join()
    print(‘all DONE at:‘,ctime())

if __name__ == ‘__main__‘:
    main()
運行結果:
starting at: Wed May 10 10:43:10 2017
start loop 0 at: Wed May 10 10:43:10 2017
start loop 1 at: Wed May 10 10:43:10 2017
loop 1 done at: Wed May 10 10:43:12 2017
loop 0 done at: Wed May 10 10:43:14 2017
all DONE at: Wed May 10 10:43:14 2017

鎖的使用

鎖有兩種狀態:鎖定和未鎖定。而且它也只支持兩個函數,獲得鎖和釋放鎖。當多線程爭奪鎖時,允許第一個獲得鎖的線程進入臨界區,並執行代碼。所有之後到達的線程將被阻塞,直到第一個線程之行結束,退出臨界區,並釋放鎖。此時,其他等待的線程可以獲得鎖並進入臨界區,不過那些被阻塞的線程進入臨界區沒有先後順序,根據Python實現不同而有所區別。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from atexit import register
from  random import randrange
from threading import Thread,Lock,current_thread
from time import sleep,ctime

class CleanOutputSet(set):
    def __str__(self):
        return ‘,‘.join(x for x in self)
lock = Lock()
loops = (randrange(2,5) for x in range(randrange(3,7)))
remaining = CleanOutputSet()

def loop(nsec):
    myname = current_thread().name
    lock.acquire()
    remaining.add(myname)
    print(‘[%s] started %s‘ %(ctime(),myname))
    lock.release()
    sleep(nsec)
    lock.acquire()
    remaining.remove(myname)
    print(‘[%s] completed %s (%d secs)‘ %(ctime(),myname,nsec))
    print(‘(remaining:%s)‘ %(remaining or ‘None‘))
    lock.release()

def main():
    for pause in loops:
        Thread(target=loop,args=(pause,)).start()

@register
def _atexit():
    print(‘all DONE at:‘,ctime())

if __name__ == ‘__main__‘:
    main()
    
運行結果:
[Wed May 10 11:20:14 2017] started Thread-1
[Wed May 10 11:20:14 2017] started Thread-2
[Wed May 10 11:20:14 2017] started Thread-3
[Wed May 10 11:20:16 2017] completed Thread-3 (2 secs)
(remaining:Thread-2,Thread-1)
[Wed May 10 11:20:16 2017] completed Thread-2 (2 secs)
(remaining:Thread-1)
[Wed May 10 11:20:18 2017] completed Thread-1 (4 secs)
(remaining:None)
all DONE at: Wed May 10 11:20:18 2017

信號量的使用

信號量是最古老的同步原語之一。它是一個計數器,當資源消耗時遞減,當資源釋放時遞增。信號量比鎖更加靈活,因為可以有多個線程,每個線程擁有有限資源的一個實例。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from atexit import register
from random import randrange
from threading import BoundedSemaphore,Lock,Thread
from time import sleep,ctime

lock = Lock()
MAX = 5
candytray = BoundedSemaphore(MAX)

def refill():
    lock.acquire()
    print(‘Refilling candy...‘)
    try:
        candytray.release()
    except ValueError:
        print(‘full,skipping‘)
    else:
        print(‘OK‘)
    lock.release()

def buy():
    lock.acquire()
    print(‘Buying candy...‘)
    if candytray.acquire(False):
        print(‘OK‘)
    else:
        print(‘empty,skipping‘)
    lock.release()

def producer(loops):
    for i in range(loops):
        refill()
        sleep(randrange(3))
def consumer(loops):
    for i in range(loops):
        buy()
        sleep(randrange(3))

def main():
    print(‘starting at:‘,ctime())
    nloops = randrange(2,6)
    print(‘THE CANDY MACHINE ( full with %d bars)‘%MAX)
    Thread(target=consumer,args=(randrange(nloops,nloops+MAX+2),)).start()
    Thread(target=producer,args=(nloops,)).start()

@register
def _atexit():
    print(‘all DONE at:‘,ctime())

if __name__ == ‘__main__‘:
    main()
    
運行結果:
starting at: Wed May 10 11:42:34 2017
THE CANDY MACHINE ( full with 5 bars)
Buying candy...
OK
Refilling candy...
OK
Refilling candy...
full,skipping
Buying candy...
OK
Refilling candy...
OK
Refilling candy...
full,skipping
Buying candy...
OK
Refilling candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
empty,skipping
Buying candy...
empty,skipping
Buying candy...
empty,skipping
all DONE at: Wed May 10 11:42:44 2017
  • 生產者-消費者問題和Queue/queue

queue模塊的類

Queue(maxsize=0)創建一個先入先出隊列。如果給定最大值,在隊列沒有空間時阻塞,否則為無限隊列
LifoQueue(maxsize=0)創建一個後入先出隊列。如果給定最大值,在隊列沒有空間時阻塞,否則為無限隊列
PriorityQueue(maxsize=0)創建一個優先級隊列。如果給定最大值,在隊列沒有空間時阻塞,否則為無限隊列

queue異常

Empty當對空隊列調用get()方法時拋出異常
Full當對滿隊列調用put()方法時拋出異常

queue對象方法

qsize()返回隊列大小
empty()如果隊列為空,則返回True
full()如果隊列為滿,則返回True
put(item,block=True,timeout=None)將item放入隊列,如果block為True且timeout為None,則在有可調用空間之前阻塞;如果timeout為正值 ,則最多阻塞timeout秒,如果block為False,則拋出Empty異常
put_nowait(item)
和put(item,False)相同
get(block=True,timeout=None)從隊列中取得元素,如果給定block(非0),則一直阻塞到有可用元素為止
get_nowait(item)和get(False)相同
task_done()

表示隊列中的某個元素已經完成,該方法會被

join()使用

join()
在隊列所有元素執行完畢並調用task_done信號之前, 保持阻塞。
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
import time
from queue import Queue
import random
class consumer(threading.Thread):
    def __init__(self, que):
        threading.Thread.__init__(self)
        self.queue = que
    def run(self):
        while True:
            if self.queue.empty():
                break
            item = self.queue.get()
            #processing the item
            time.sleep(item)
            print(self.name,item)
            self.queue.task_done()
        return
que = Queue()
for x in range(10):
    number = random.randint(1, 5)
    print(‘random %d number is %d:‘ % (x, number))
    que.put(number, True, None)
print(‘queue is:‘, que.queue)
consumers = [consumer(que) for x in range(5)]

for c in consumers:
    c.start()
que.join()

運行結果:
random 0 number is 5:
random 1 number is 1:
random 2 number is 2:
random 3 number is 1:
random 4 number is 5:
random 5 number is 3:
random 6 number is 3:
random 7 number is 2:
random 8 number is 2:
random 9 number is 2:
queue is: deque([5, 1, 2, 1, 5, 3, 3, 2, 2, 2])
Thread-2 1
Thread-4 1
Thread-3 2
Thread-3 2
Thread-2 3
Thread-4 3
Thread-1 5
Thread-5 5
Thread-3 2
Thread-2 2


本文出自 “隨風而飄” 博客,請務必保留此出處http://yinsuifeng.blog.51cto.com/10173491/1924054

Python多線程編程