多線程實踐—Python多線程編程
前面的一些文章和腳本都是只能做學習多線程的原理使用,實際上什麽有用的事情也沒有做。接下來進行多線程的實踐,看一看在實際項目中是怎麽使用多線程的。
圖書排名示例
Bookrank.py:
該腳本通過單線程進行下載圖書排名信息的調用
?
1 from atexit import register 2 from re import compile 3 from threading import Thread 4 from time import sleep, ctime 5 import requests 6 ? 7 REGEX = compile(‘#([\d,]+) in Books‘) 8 AMZN = ‘https://www.amazon.com/dp/‘ 9 ISBNS = { 10 ‘0132269937‘: ‘Core Python Programming‘, 11 ‘0132356139‘: ‘Python Web Development with Django‘, 12 ‘0137143419‘: ‘Python Fundamentals‘, 13 } 14 ? 15 def getRanking(isbn): 16 url = ‘%s%s‘ % (AMZN, isbn) 17 page = requests.get(url)18 data = page.text 19 return REGEX.findall(data)[0] 20 ? 21 def _showRanking(isbn): 22 print ‘- %r ranked %s‘ % ( 23 ISBNS[isbn], getRanking(isbn)) 24 ? 25 def _main(): 26 print ‘At‘, ctime(), ‘on Amazon‘ 27 for isbn in ISBNS: 28 _showRanking(isbn) 29 ? 30 @register31 def _atexit(): 32 print ‘all DONE at:‘, ctime() 33 ? 34 if __name__ == ‘__main__‘: 35 _main() 36 ?
輸出結果為:
1 /usr/bin/python ~/Test_Temporary/bookrank.py 2 At Sat Jul 28 17:16:51 2018 on Amazon 3 - ‘Core Python Programming‘ ranked 322,656 4 - ‘Python Fundamentals‘ ranked 4,739,537 5 - ‘Python Web Development with Django‘ ranked 1,430,855 6 all DONE at: Sat Jul 28 17:17:08 2018
引入線程
上面的例子只是一個單線程程序,下面引入線程,並使用多線程再執行程序對比各自所需的時間。
? 將上面腳本中 _main() 函數的 _showRanking(isbn)
修改以下代碼:
?
Thread(target=_showRanking, args=(isbn,)).start()
再次執行查看返回結果:
1 /usr/bin/python ~/Test_Temporary/bookrank.py 2 At Sat Jul 28 17:39:16 2018 on Amazon 3 - ‘Python Fundamentals‘ ranked 4,739,537 4 - ‘Python Web Development with Django‘ ranked 1,430,855 5 - ‘Core Python Programming‘ ranked 322,656 6 all DONE at: Sat Jul 28 17:39:19 2018
從兩個的輸出結果中可以看出,使用單線程時總體完成的時間為 7s ,而使用多線程時,總體完成時間為 3s 。另外一個需要註意的是,單線程版本是按照變量的順序輸出,而多線程版本按照完成的順序輸出。
同步原語
一般在多線程代碼中,總會有一些特定的函數或代碼塊不希望(或不應該)被多個線程同時執行,通常包括修改數據庫、更新文件或其它會產生竟態條件的類似情況。這就是需要使用同步的情況。
-
當任意數量的線程可以訪問臨界區的代碼,但給定的時刻只有一個線程可以通過時,就是使用同步的時候了;
-
程序員選擇適合的同步原語,或者線程控制機制來執行同步;
-
進程同步有不同的類型【參見:https://en.wikipedia.org/wiki/Synchronization_(computer_science) 】
-
同步原語有:鎖/互斥、信號量。鎖是最簡單、最低級的機制,而信號量用於多線程競爭有限資源的情況。
鎖示例
鎖有兩種狀態:鎖定和未鎖定。而且它也只支持兩個函數:獲得鎖和釋放鎖。
-
當多線程爭奪鎖時,允許第一個獲得鎖的線程進入臨界區,並執行代碼;
-
所有之後到達的線程將被阻塞,直到第一個線程結束退出臨界區並釋放鎖;
-
鎖被釋放後,其它等待的線程可以繼續爭奪鎖,並進入臨界區;
-
被阻塞的線程沒有順序,不會先到先得,勝出的線程是不確定的。
代碼示例(mtsleepF.py):
*註:該腳本派生了隨機數量的線程,每個線程執行結束時會進行輸出
1 # -*- coding=utf-8 -*- 2 from atexit import register 3 from random import randrange 4 from threading import Thread, currentThread 5 from time import sleep, ctime 6 ? 7 class CleanOutputSet(set): 8 def __str__(self): 9 return ‘, ‘.join(x for x in self) 10 ? 11 loops = (randrange(2, 5) for x in range(randrange(3, 7))) 12 remaining = CleanOutputSet() 13 ? 14 def loop(nsec): 15 myname = currentThread().name 16 remaining.add(myname) 17 print(‘這個是目前線程池中的線程:‘, remaining) 18 print(‘[%s] Started %s‘ % (ctime(), myname)) 19 sleep(nsec) 20 remaining.remove(myname) 21 print(‘[%s] Completed %s (%d secs)‘ % (ctime(), myname, nsec)) 22 print(‘ (remaining: %s)‘ % (remaining or ‘None‘)) 23 ? 24 def _main(): 25 for pause in loops: 26 Thread(target=loop, args=(pause,)).start() 27 ? 28 @register 29 def _atexit(): 30 print(‘all DONE at:%s‘ % ctime()) 31 ? 32 if __name__ == ‘__main__‘: 33 _main()
執行後的輸出結果:
1 /usr/local/bin/python3.6 /Users/zhenggougou/Project/Test_Temporary/mtsleepF.py 2 這個是目前線程池中的線程: Thread-1 3 [Sat Jul 28 21:09:44 2018] Started Thread-1 4 這個是目前線程池中的線程: Thread-2, Thread-1 5 [Sat Jul 28 21:09:44 2018] Started Thread-2 6 這個是目前線程池中的線程: Thread-3, Thread-2, Thread-1 7 [Sat Jul 28 21:09:44 2018] Started Thread-3 8 這個是目前線程池中的線程: Thread-3, Thread-2, Thread-4, Thread-1 9 [Sat Jul 28 21:09:44 2018] Started Thread-4 10 這個是目前線程池中的線程: Thread-5, Thread-4, Thread-3, Thread-2, Thread-1 11 [Sat Jul 28 21:09:44 2018] Started Thread-5 12 這個是目前線程池中的線程: Thread-5, Thread-6, Thread-4, Thread-3, Thread-2, Thread-1 13 [Sat Jul 28 21:09:44 2018] Started Thread-6 14 [Sat Jul 28 21:09:46 2018] Completed Thread-2 (2 secs) 15 [Sat Jul 28 21:09:46 2018] Completed Thread-1 (2 secs) 16 [Sat Jul 28 21:09:46 2018] Completed Thread-3 (2 secs) 17 (remaining: Thread-5, Thread-6, Thread-4) 18 [Sat Jul 28 21:09:46 2018] Completed Thread-6 (2 secs) 19 (remaining: Thread-5, Thread-4) 20 [Sat Jul 28 21:09:46 2018] Completed Thread-4 (2 secs) 21 (remaining: Thread-5) 22 (remaining: Thread-5) 23 [Sat Jul 28 21:09:46 2018] Completed Thread-5 (2 secs) 24 (remaining: None) 25 (remaining: None) 26 all DONE at:Sat Jul 28 21:09:46 2018
從執行結果中可以看出,有的時候可能會存在多個線程並行執行操作刪除 remaining 集合中數據的情況。比如上面結果中,線程1、2、3 就是同時執行去刪除集合中數據的。所以為了避免這種情況需要加鎖,通過引入 Lock (或 RLock),然後創建一個鎖對象來保證數據的修改每次只有一個線程能操作。
-
首先先導入鎖類,然後創建鎖對象
from threading import Thread, Lock, currentThread
lock = Lock()
-
然後使用創建的鎖,將上面 mtsleepF.py 腳本中 loop() 函數做以下改變:
1 def loop(nsec): 2 myname = currentThread().name 3 lock.acquire() # 獲取鎖 4 remaining.add(myname) 5 print(‘這個是目前線程池中的線程:‘, remaining) 6 print(‘[%s] Started %s‘ % (ctime(), myname)) 7 lock.release() # 釋放鎖 8 sleep(nsec) 9 lock.acquire() # 獲取鎖 10 remaining.remove(myname) 11 print(‘[%s] Completed %s (%d secs)‘ % (ctime(), myname, nsec)) 12 print(‘ (remaining: %s)‘ % (remaining or ‘None‘)) 13 lock.release() # 釋放鎖
在操作變量的前後需要進行獲取鎖和釋放鎖的操作,以保證在修改變量時只有一個線程進行。上面的代碼有兩處修改變量,一是:remaining.add(myname)
,二是:remaining.remove(myname)
。 所以上面代碼中有兩次獲取鎖和釋放鎖的操作。其實還有一種方案可以不再調用鎖的 acquire()
和 release()
方法,二是使用上下文管理,進一步簡化代碼。代碼如下:
1 def loop(nesc): 2 myname = currentThread().name 3 with lock: 4 remaining.add(myname) 5 print(‘[{0}] Started {1}‘.format(ctime(), myname)) 6 sleep(nesc) 7 with lock: 8 remaining.remove(myname) 9 print(‘[{0}] Completed {1} ({2} secs)‘.format(ctime(), myname, nesc)) 10 print(‘ (remaining: {0})‘.format(remaining or ‘None‘))
信號量示例
鎖非常易於理解和實現,也很容易決定何時需要它們,然而,如果情況更加復雜,可能需要一個更強大的同步原語來代替鎖。
信號量是最古老的同步原語之一。它是一個計數器,當資源消耗時遞減,當資源釋放時遞增。可以認為信號量代表它們的資源可用或不可用。信號量比鎖更加靈活,因為可以有多個線程,每個線程都擁有有限資源的一個實例。
-
消耗資源使計數器遞減的操作習慣上稱為 P() —— acquire ;
-
當一個線程對一個資源完成操作時,該資源需要返回資源池中,這個操作一般稱為 V() —— release 。
示例,糖果機和信號量(candy.py):
*註:該腳本使用了鎖和信號量來模擬一個糖果機
1 # -*- coding=utf-8 -*- 2 from atexit import register 3 from random import randrange 4 from threading import BoundedSemaphore, Lock, Thread 5 from time import sleep, ctime 6 ? 7 lock = Lock() 8 MAX = 5 9 candytray = BoundedSemaphore(MAX) 10 ? 11 def refill(): 12 lock.acquire() 13 print(‘Refilling candy‘) 14 try: 15 candytray.release() # 釋放資源 16 except ValueError: 17 print(‘full, skipping‘) 18 else: 19 print(‘OK‘) 20 lock.release() 21 ? 22 def buy(): 23 lock.acquire() 24 print(‘Buying candy...‘) 25 if candytray.acquire(False): # 消耗資源 26 print(‘OK‘) 27 else: 28 print(‘empty, skipping‘) 29 lock.release() 30 ? 31 def producer(loops): 32 for i in range(loops): 33 refill() 34 sleep(randrange(3)) 35 ? 36 def consumer(loops): 37 for i in range(loops): 38 buy() 39 sleep(randrange(3)) 40 ? 41 def _main(): 42 print(‘starting at:{0}‘.format(ctime())) 43 nloops = randrange(2, 6) 44 print(‘THE CANDY MACHINE (full with %d bars)!‘ % MAX) 45 Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2),)).start() 46 Thread(target=producer, args=(nloops,)).start() 47 ? 48 @register 49 def _atexit(): 50 print(‘all DONE at:{0}‘.format(ctime())) 51 ? 52 if __name__ == ‘__main__‘: 53 _main()
執行結果為:
1 /usr/local/bin/python3.6 ~/Test_Temporary/candy.py 2 starting at:Sun Jul 29 21:12:50 2018 3 THE CANDY MACHINE (full with 5 bars)! 4 Buying candy... 5 OK 6 Refilling candy 7 OK 8 Refilling candy 9 full, skipping 10 Buying candy... 11 OK 12 Buying candy... 13 OK 14 all DONE at:Sun Jul 29 21:12:52 2018
多線程實踐—Python多線程編程