1. 程式人生 > >多線程實踐—Python多線程編程

多線程實踐—Python多線程編程

request val 一個 簡單 temp sat 類型 tar 耗時

多線程實踐

前面的一些文章和腳本都是只能做學習多線程的原理使用,實際上什麽有用的事情也沒有做。接下來進行多線程的實踐,看一看在實際項目中是怎麽使用多線程的。

圖書排名示例

Bookrank.py:

該腳本通過單線程進行下載圖書排名信息的調用

?
 1 from atexit import register
 2 from re import compile
 3 from threading import Thread
 4 from time import sleep, ctime
 5 import requests
 6 ?
 7 REGEX = compile(#([\d,]+) in Books
) 8 AMZN = https://www.amazon.com/dp/ 9 ISBNS = { 10 0132269937: Core Python Programming, 11 0132356139: Python Web Development with Django, 12 0137143419: Python Fundamentals, 13 } 14 ? 15 def getRanking(isbn): 16 url = %s%s % (AMZN, isbn) 17 page = requests.get(url)
18 data = page.text 19 return REGEX.findall(data)[0] 20 ? 21 def _showRanking(isbn): 22 print - %r ranked %s % ( 23 ISBNS[isbn], getRanking(isbn)) 24 ? 25 def _main(): 26 print At, ctime(), on Amazon 27 for isbn in ISBNS: 28 _showRanking(isbn) 29 ? 30 @register
31 def _atexit(): 32 print all DONE at:, ctime() 33 ? 34 if __name__ == __main__: 35 _main() 36 ?

 

輸出結果為:

1 /usr/bin/python ~/Test_Temporary/bookrank.py
2 At Sat Jul 28 17:16:51 2018 on Amazon
3 - Core Python Programming ranked 322,656
4 - Python Fundamentals ranked 4,739,537
5 - Python Web Development with Django ranked 1,430,855
6 all DONE at: Sat Jul 28 17:17:08 2018
 

引入線程

上面的例子只是一個單線程程序,下面引入線程,並使用多線程再執行程序對比各自所需的時間。

? 將上面腳本中 _main() 函數的 _showRanking(isbn) 修改以下代碼:

?

Thread(target=_showRanking, args=(isbn,)).start()

再次執行查看返回結果:

1 /usr/bin/python ~/Test_Temporary/bookrank.py
2 At Sat Jul 28 17:39:16 2018 on Amazon
3 - Python Fundamentals ranked 4,739,537
4 - Python Web Development with Django ranked 1,430,855
5 - Core Python Programming ranked 322,656
6 all DONE at: Sat Jul 28 17:39:19 2018

從兩個的輸出結果中可以看出,使用單線程時總體完成的時間為 7s ,而使用多線程時,總體完成時間為 3s 。另外一個需要註意的是,單線程版本是按照變量的順序輸出,而多線程版本按照完成的順序輸出。

同步原語

一般在多線程代碼中,總會有一些特定的函數或代碼塊不希望(或不應該)被多個線程同時執行,通常包括修改數據庫、更新文件或其它會產生竟態條件的類似情況。這就是需要使用同步的情況。

  • 當任意數量的線程可以訪問臨界區的代碼,但給定的時刻只有一個線程可以通過時,就是使用同步的時候了;

  • 程序員選擇適合的同步原語,或者線程控制機制來執行同步;

  • 進程同步有不同的類型【參見:https://en.wikipedia.org/wiki/Synchronization_(computer_science) 】

  • 同步原語有:鎖/互斥、信號量。鎖是最簡單、最低級的機制,而信號量用於多線程競爭有限資源的情況。

鎖示例

鎖有兩種狀態:鎖定和未鎖定。而且它也只支持兩個函數:獲得鎖和釋放鎖。

  • 當多線程爭奪鎖時,允許第一個獲得鎖的線程進入臨界區,並執行代碼;

  • 所有之後到達的線程將被阻塞,直到第一個線程結束退出臨界區並釋放鎖;

  • 鎖被釋放後,其它等待的線程可以繼續爭奪鎖,並進入臨界區;

  • 被阻塞的線程沒有順序,不會先到先得,勝出的線程是不確定的。

代碼示例(mtsleepF.py):

*註:該腳本派生了隨機數量的線程,每個線程執行結束時會進行輸出

 1 # -*- coding=utf-8 -*-
 2 from atexit import register
 3 from random import randrange
 4 from threading import Thread, currentThread
 5 from time import sleep, ctime
 6 ?
 7 class CleanOutputSet(set):
 8     def __str__(self):
 9         return , .join(x for x in self)
10 ?
11 loops = (randrange(2, 5) for x in range(randrange(3, 7)))
12 remaining = CleanOutputSet()
13 ?
14 def loop(nsec):
15     myname = currentThread().name
16     remaining.add(myname)
17     print(這個是目前線程池中的線程:, remaining)
18     print([%s] Started %s % (ctime(), myname))
19     sleep(nsec)
20     remaining.remove(myname)
21     print([%s] Completed %s (%d secs) % (ctime(), myname, nsec))
22     print( (remaining: %s) % (remaining or None))
23 ?
24 def _main():
25     for pause in loops:
26         Thread(target=loop, args=(pause,)).start()
27 ?
28 @register
29 def _atexit():
30     print(all DONE at:%s % ctime())
31 ?
32 if __name__ == __main__:
33     _main()
 

執行後的輸出結果:

 1 /usr/local/bin/python3.6 /Users/zhenggougou/Project/Test_Temporary/mtsleepF.py
 2 這個是目前線程池中的線程: Thread-1
 3 [Sat Jul 28 21:09:44 2018] Started Thread-1
 4 這個是目前線程池中的線程: Thread-2, Thread-1
 5 [Sat Jul 28 21:09:44 2018] Started Thread-2
 6 這個是目前線程池中的線程: Thread-3, Thread-2, Thread-1
 7 [Sat Jul 28 21:09:44 2018] Started Thread-3
 8 這個是目前線程池中的線程: Thread-3, Thread-2, Thread-4, Thread-1
 9 [Sat Jul 28 21:09:44 2018] Started Thread-4
10 這個是目前線程池中的線程: Thread-5, Thread-4, Thread-3, Thread-2, Thread-1
11 [Sat Jul 28 21:09:44 2018] Started Thread-5
12 這個是目前線程池中的線程: Thread-5, Thread-6, Thread-4, Thread-3, Thread-2, Thread-1
13 [Sat Jul 28 21:09:44 2018] Started Thread-6
14 [Sat Jul 28 21:09:46 2018] Completed Thread-2 (2 secs)
15 [Sat Jul 28 21:09:46 2018] Completed Thread-1 (2 secs)
16 [Sat Jul 28 21:09:46 2018] Completed Thread-3 (2 secs)
17  (remaining: Thread-5, Thread-6, Thread-4)
18 [Sat Jul 28 21:09:46 2018] Completed Thread-6 (2 secs)
19  (remaining: Thread-5, Thread-4)
20 [Sat Jul 28 21:09:46 2018] Completed Thread-4 (2 secs)
21  (remaining: Thread-5)
22  (remaining: Thread-5)
23 [Sat Jul 28 21:09:46 2018] Completed Thread-5 (2 secs)
24  (remaining: None)
25  (remaining: None)
26 all DONE at:Sat Jul 28 21:09:46 2018

從執行結果中可以看出,有的時候可能會存在多個線程並行執行操作刪除 remaining 集合中數據的情況。比如上面結果中,線程1、2、3 就是同時執行去刪除集合中數據的。所以為了避免這種情況需要加鎖,通過引入 Lock (或 RLock),然後創建一個鎖對象來保證數據的修改每次只有一個線程能操作。

  1. 首先先導入鎖類,然後創建鎖對象

    from threading import Thread, Lock, currentThread

    lock = Lock()

  2. 然後使用創建的鎖,將上面 mtsleepF.py 腳本中 loop() 函數做以下改變:

     1 def loop(nsec):
     2     myname = currentThread().name
     3     lock.acquire() # 獲取鎖
     4     remaining.add(myname)
     5     print(這個是目前線程池中的線程:, remaining)
     6     print([%s] Started %s % (ctime(), myname))
     7     lock.release() # 釋放鎖
     8     sleep(nsec)
     9     lock.acquire() # 獲取鎖
    10     remaining.remove(myname)
    11     print([%s] Completed %s (%d secs) % (ctime(), myname, nsec))
    12     print( (remaining: %s) % (remaining or None))
    13     lock.release() # 釋放鎖

在操作變量的前後需要進行獲取鎖和釋放鎖的操作,以保證在修改變量時只有一個線程進行。上面的代碼有兩處修改變量,一是:remaining.add(myname) ,二是:remaining.remove(myname)。 所以上面代碼中有兩次獲取鎖和釋放鎖的操作。其實還有一種方案可以不再調用鎖的 acquire()release() 方法,二是使用上下文管理,進一步簡化代碼。代碼如下:

 1 def loop(nesc):
 2     myname = currentThread().name
 3     with lock:
 4         remaining.add(myname)
 5         print([{0}] Started {1}.format(ctime(), myname))
 6     sleep(nesc)
 7     with lock:
 8         remaining.remove(myname)
 9         print([{0}] Completed {1} ({2} secs).format(ctime(), myname, nesc))
10         print( (remaining: {0}).format(remaining or None))

 

信號量示例

鎖非常易於理解和實現,也很容易決定何時需要它們,然而,如果情況更加復雜,可能需要一個更強大的同步原語來代替鎖。

信號量是最古老的同步原語之一。它是一個計數器,當資源消耗時遞減,當資源釋放時遞增。可以認為信號量代表它們的資源可用或不可用。信號量比鎖更加靈活,因為可以有多個線程,每個線程都擁有有限資源的一個實例。

  • 消耗資源使計數器遞減的操作習慣上稱為 P() —— acquire ;

  • 當一個線程對一個資源完成操作時,該資源需要返回資源池中,這個操作一般稱為 V() —— release 。

示例,糖果機和信號量(candy.py):

*註:該腳本使用了鎖和信號量來模擬一個糖果機

 1 # -*- coding=utf-8 -*-
 2 from atexit import register
 3 from random import randrange
 4 from threading import BoundedSemaphore, Lock, Thread
 5 from time import sleep, ctime
 6 ?
 7 lock = Lock()
 8 MAX = 5
 9 candytray = BoundedSemaphore(MAX)
10 ?
11 def refill():
12     lock.acquire()
13     print(Refilling candy)
14     try:
15         candytray.release() # 釋放資源
16     except ValueError:
17         print(full, skipping)
18     else:
19         print(OK)
20     lock.release()
21 ?
22 def buy():
23     lock.acquire()
24     print(Buying candy...)
25     if candytray.acquire(False): # 消耗資源
26         print(OK)
27     else:
28         print(empty, skipping)
29     lock.release()
30 ?
31 def producer(loops):
32     for i in range(loops):
33         refill()
34         sleep(randrange(3))
35 ?
36 def consumer(loops):
37     for i in range(loops):
38         buy()
39         sleep(randrange(3))
40 ?
41 def _main():
42     print(starting at:{0}.format(ctime()))
43     nloops = randrange(2, 6)
44     print(THE CANDY MACHINE (full with %d bars)! % MAX)
45     Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2),)).start()
46     Thread(target=producer, args=(nloops,)).start()
47 ?
48 @register
49 def _atexit():
50     print(all DONE at:{0}.format(ctime()))
51 ?
52 if __name__ == __main__:
53     _main()

執行結果為:

 1 /usr/local/bin/python3.6 ~/Test_Temporary/candy.py
 2 starting at:Sun Jul 29 21:12:50 2018
 3 THE CANDY MACHINE (full with 5 bars)!
 4 Buying candy...
 5 OK
 6 Refilling candy
 7 OK
 8 Refilling candy
 9 full, skipping
10 Buying candy...
11 OK
12 Buying candy...
13 OK
14 all DONE at:Sun Jul 29 21:12:52 2018

多線程實踐—Python多線程編程