1. 程式人生 > >理解Python併發程式設計

理解Python併發程式設計

前言

對我來說,程式設計的樂趣之一是想辦法讓程式執行的越來越快,程式碼越寫越優雅。在剛開始學習併發程式設計時,相信你它會有一些困惑,本文將解釋多個併發開發的問題並幫助你快速瞭解併發程式設計的不同場景和應該使用的解決方案。

GIL

Python(特指CPython)的多執行緒的程式碼並不能利用多核的優勢,而是通過著名的全域性解釋鎖(GIL)來進行處理的。如果是一個計算型的任務,使用多執行緒GIL就會讓多執行緒變慢。我們舉個計算斐波那契數列的例子:

12345678910111213141516171819202122232425262728293031323334353637383940 # coding=utf-8
import timeimport threadingdef profile(func): def wrapper(*args, **kwargs): import time start = time.time() func(*args, **kwargs) end = time.time() print 'COST: {}'.format(end - start) return wrapperdef fib(n): if n<= 2: return 1 return fib(n-1
) + fib(n-2)@profiledef nothread(): fib(35) fib(35)@profiledef hasthread(): for i in range(2): t = threading.Thread(target=fib, args=(35,)) t.start() main_thread = threading.currentThread() for t in threading.enumerate(): if t is main_thread: continue t.join()nothread()hasthread()

執行的結果你猜猜會怎麼樣:

123 ❯ python profile_thread.pyCOST: 5.05716490746COST: 6.75599503517

這種情況還不如不用多執行緒!

GIL是必須的,這是Python設計的問題:Python直譯器是非執行緒安全的。這意味著當從執行緒內嘗試安全的訪問Python物件的時候將有一個全域性的強制鎖。 在任何時候,僅僅一個單一的執行緒能夠獲取Python物件或者C API。每100個位元組的Python指令直譯器將重新獲取鎖,這(潛在的)阻塞了I/O操作。因為鎖,CPU密集型的程式碼使用執行緒庫時,不會獲得性能的提高(但是當它使用之後介紹的多程序庫時,效能可以獲得提高)。

那是不是由於GIL的存在,多執行緒庫就是個「雞肋」呢?當然不是。事實上我們平時會接觸非常多的和網路通訊或者資料輸入/輸出相關的程式,比如網路爬蟲、文字處理等等。這時候由於網路情況和I/O的效能的限制,Python直譯器會等待讀寫資料的函式呼叫返回,這個時候就可以利用多執行緒庫提高併發效率了。

同步機制

Python執行緒包含多種同步機制:

1. Semaphore(訊號量)

在多執行緒程式設計中,為了防止不同的執行緒同時對一個公用的資源(比如全部變數)進行修改,需要進行同時訪問的數量(通常是1)。訊號量同步基於內部計數器,每呼叫一次acquire(),計數器減1;每呼叫一次release(),計數器加1.當計數器為0時,acquire()呼叫被阻塞。

123456789101112131415161718192021222324 import timefrom random import randomfrom threading import Thread, Semaphoresema = Semaphore(3)def foo(tid): with sema: print '{} acquire sema'.format(tid) wt = random() * 2 time.sleep(wt) print '{} release sema'.format(tid)threads = []for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start()for t in threads: t.join()

這個例子中,我們限制了同時能訪問資源的數量為3。看一下執行的效果:

1234567891011 ❯ python semaphore.py0 acquire sema1 acquire sema 2 acquire sema2 release sema 3 acquire sema1 release sema 4 acquire sema0 release sema3 release sema4 release sema

2. Lock(鎖)

Lock也可以叫做互斥鎖,其實相當於訊號量為1。我們先看一個不加鎖的例子:

123456789101112131415161718192021222324 import timefrom threading import Threadvalue = 0def getlock(): global value new = value + 1 time.sleep(0.001) # 使用sleep讓執行緒有機會切換 value = newthreads = []for i in range(100): t = Thread(target=getlock) t.start() threads.append(t)for t in threads: t.join()print value

執行一下:

12 ❯ python nolock.py16

大寫的黑人問號。不加鎖的情況下,結果會遠遠的小於100。那我們加上互斥鎖看看:

12345678910111213141516171819202122232425 import timefrom threading import Thread, Lockvalue = 0lock = Lock()def getlock(): global value with lock: new = value + 1 time.sleep(0.001) value = newthreads = []for i in range(100): t = Thread(target=getlock) t.start() threads.append(t)for t in threads: t.join()print value

我們對value的自增加了鎖,就可以保證了結果了:

12 ❯ python lock.py100

3. RLock(可重入鎖)

acquire() 能夠不被阻塞的被同一個執行緒呼叫多次。但是要注意的是release()需要呼叫與acquire()相同的次數才能釋放鎖。

4. Condition(條件)

一個執行緒等待特定條件,而另一個執行緒發出特定條件滿足的訊號。最好說明的例子就是「生產者/消費者」模型:

12345678910111213141516171819202122232425262728 import timeimport threadingdef consumer(cond): t = threading.currentThread() with cond: cond.wait() # wait()方法建立了一個名為waiter的鎖,並且設定鎖的狀態為locked。這個waiter鎖用於執行緒間的通訊 print '{}: Resource is available to consumer'.format(t.name)def producer(cond): t = threading.currentThread() with cond: print '{}: Making resource available'.format(t.name) cond.notifyAll() # 釋放waiter鎖,喚醒消費者condition = threading.Condition()c1 = threading.Thread(name='c1', target=consumer, args=(condition,))c2 = threading.Thread(name='c2', target=consumer, args=(condition,))p = threading.Thread(name='p', target=producer, args=(condition,))c1.start()time.sleep(1)c2.start()time.sleep(1)p.start()

執行一下:

1234 ❯ python condition.pyp: Making resource availablec2: Resource is available to consumerc1: Resource is available to consumer

可以看到生產者傳送通知之後,消費者都收到了。

5. Event

一個執行緒傳送/傳遞事件,另外的執行緒等待事件的觸發。我們同樣的用「生產者/消費者」模型的例子:

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 # coding=utf-8import timeimport threadingfrom random import randintTIMEOUT = 2def consumer(event, l): t = threading.currentThread() while 1: event_is_set = event.wait(TIMEOUT) if event_is_set: try: integer = l.pop() print '{} popped from list by {}'.format(integer, t.name) event.clear() # 重置事件狀態 except IndexError: # 為了讓剛啟動時容錯 passdef producer(event, l): t = threading.currentThread() while 1: integer = randint(10, 100) l.append(integer) print '{} appended to list by {}'.format(integer, t.name) event.set() # 設定事件 time.sleep(1)event = threading.Event()l = []threads = []for name in ('consumer1', 'consumer2'): t = threading.Thread(name=name, target=consumer, args=(event, l)) t.start() threads.append(t)p = threading.Thread(name='producer1', target=producer, args=(event, l))p.start()threads.append(p)for t in threads: t.join()

執行的效果是這樣的:

123456789101112 77 appended to list by producer177 popped from list by consumer146 appended to list by producer146 popped from list by consumer243 appended to list by producer143 popped from list by consumer237 appended to list by producer137 popped from list by consumer233 appended to list by producer133 popped from list by consumer257 appended to list by producer157 popped from list by consumer1

可以看到事件被2個消費者比較平均的接收並處理了。如果使用了wait方法,執行緒就會等待我們設定事件,這也有助於保證任務的完成。

6. Queue

佇列在併發開發中最常用的。我們藉助「生產者/消費者」模式來理解:生產者把生產的「訊息」放入佇列,消費者從這個佇列中對去對應的訊息執行。

大家主要關心如下4個方法就好了:

  1. put: 向佇列中新增一個項。
  2. get: 從佇列中刪除並返回一個項。
  3. task_done: 當某一項任務完成時呼叫。
  4. join: 阻塞直到所有的專案都被處理完。
123456789101112131415161718192021222324252627282930 # coding=utf-8import timeimport threadingfrom random import randomfrom Queue import Queueq = Queue()def double(n): return n * 2def producer(): while 1: wt = random() time.sleep(wt) q.put((double, wt))def consumer(): while 1: task, arg = q.get() print arg, task(arg) q.task_done()for target in(producer, consumer): t = threading.Thread(target=target) t.start()

這就是最簡化的佇列架構。

Queue模組還自帶了PriorityQueue(帶有優先順序)和LifoQueue(後進先出)2種特殊佇列。我們這裡展示下執行緒安全的優先順序佇列的用法,
PriorityQueue要求我們put的資料的格式是(priority_number, data),我們看看下面的例子:

123456789101112131415161718192021222324252627282930313233343536373839 import timeimport threadingfrom random import randintfrom Queue import PriorityQueueq = PriorityQueue()def double(n): return n * 2def producer(): count = 0 while 1: if count > 5: break pri = randint(0, 100) print 'put :{}'.format(pri) q.put((pri, double, pri)) # (priority, func, args) count += 1def consumer(): while 1: if q.empty(): break pri, task, arg = q.get() print '[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg)) q.task_done() time.sleep(0.1)t = threading.Thread(target=producer)t.start()time.sleep(1)t = threading.Thread(target=consumer)t.start()

其中消費者是故意讓它執行的比生產者慢很多,為了節省篇幅,只隨機產生5次隨機結果。我們看下執行的效果:

1234567891011121314 ❯ python priority_queue.pyput :84put :86put :16put :93put :14put :93[PRI:14] 14 * 2 = 28[PRI:16] 16 * 2 = 32[PRI:84] 84 * 2 = 168[PRI:86] 86 * 2 = 172[PRI:93] 93 * 2 = 186[PRI:93] 93 * 2 = 186

可以看到put時的數字是隨機的,但是get的時候先從優先順序更高(數字小表示優先順序高)開始獲取的。

執行緒池

面向物件開發中,大家知道建立和銷燬物件是很費時間的,因為建立一個物件要獲取記憶體資源或者其它更多資源。無節制的建立和銷燬執行緒是一種極大的浪費。那我們可不可以把執行完任務的執行緒不銷燬而重複利用呢?彷彿就是把這些執行緒放進一個池子,一方面我們可以控制同時工作的執行緒數量,一方面也避免了建立和銷燬產生的開銷。

執行緒池在標準庫中其實是有體現的,只是在官方文章中基本沒有被提及:

1234 In : from multiprocessing.pool import ThreadPoolIn : pool = ThreadPool(5)In : pool.map(lambda x: x**2, range(5))Out: [0, 1, 4, 9, 16]

當然我們也可以自己實現一個:

相關推薦

理解 Python 併發程式設計中的 join 方法

程式碼清單 - 1: from multiprocessing import Process from time import ctime, sleep def clock(interval): while True: print("

理解Python併發程式設計-PoolExecutor篇

之前我們使用多執行緒(threading)和多程序(multiprocessing)完成常規的需求,在啟動的時候start、jon等步驟不能省,複雜的需要還要用1-2個佇列。隨著需求越來越複雜,如果沒有良好的設計和抽象這部分的功能層次,程式碼量越多除錯的難度就越大。有沒有

理解Python併發程式設計一篇就夠了|執行緒篇

前言 程式設計的樂趣之一是想辦法讓程式執行的越來越快,程式碼越寫越優雅。在剛開始學習併發程式設計時,相信你它會有一些困惑,本來這是一篇解釋多個併發開發的問題並幫助你快速瞭解併發程式設計的不同場景和應該使用的解決方案的文章,但是受微信文章長度限制和筆者對閱讀體驗的擔心,

理解Python併發程式設計

前言 對我來說,程式設計的樂趣之一是想辦法讓程式執行的越來越快,程式碼越寫越優雅。在剛開始學習併發程式設計時,相信你它會有一些困惑,本文將解釋多個併發開發的問題並幫助你快速瞭解併發程式設計的不同場景和應該使用的解決方案。 GIL Python(特指CPython)的

Python併發程式設計之執行緒池/程序池

Python併發程式設計之執行緒池/程序池 2017/01/18 · 基礎知識 · 2 評論 · 併發, 執行緒池, 程序池 原文出處: ZiWenXie    引言 Pyt

基於JVM原理、JMM模型和CPU快取模型深入理解Java併發程式設計

許多以Java多執行緒開發為主題的技術書籍,都會把對Java虛擬機器和Java記憶體模型的講解,作為講授Java併發程式設計開發的主要內容,有的還深入到計算機系統的記憶體、CPU、快取等予以說明。實際上,在實際的Java開發工作中,僅僅瞭解併發程式設計的建立、啟動、管理和通訊等基本知識還是不夠的。一

Python併發程式設計之同步\非同步and阻塞\非阻塞

一、什麼是程序 程序: 正在進行的一個過程或者說一個任務。而負責執行任務則是cpu。 程序和程式的區別: 程式僅僅只是一堆程式碼而已,而程序指的是程式的執行過程。 需要強調的是:同一個程式執行兩次,那也是兩個程序,比如開啟暴風影音,雖然都是同一個軟體,但是一個可以播郭德綱,一個可以播高曉鬆。 二、並行

Python併發程式設計之多執行緒使用

目錄 一 開啟執行緒的兩種方式 二 在一個程序下開啟多個執行緒與在一個程序下開啟多個子程序的區別 三 練習 四 執行緒相關的其他方法 五 守護執行緒 六 Python GIL(Global Interpreter Lock) 七 同步鎖 八 死鎖現象

15.python併發程式設計(執行緒--程序--協程)

一.程序:1.定義:程序最小的資源單位,本質就是一個程式在一個數據集上的一次動態執行(執行)的過程2.組成:程序一般由程式,資料集,程序控制三部分組成:(1)程式:用來描述程序要完成哪些功能以及如何完成(2)資料集:是程式在執行過程中所需要使用的一切資源(3)程序控制塊:用來記錄程序外部特徵,描述程序的執行變

Python併發程式設計與IO模型

事件驅動 通常,我們寫伺服器處理模型的程式時,有以下幾種模型: 每收到一個請求,建立一個新的程序,來處理該請求; 每收到一個請求,建立一個新的執行緒,來處理該請求; 每收到一個請求,放入一個事件列表,讓主程序通過非阻塞I/O方式來處理請求,通常也可以理解為協程模式。 第(1)

Python併發程式設計系列之多執行緒

1引言 2 建立執行緒   2.1 函式的方式建立執行緒   2.2 類的方式建立執行緒 3 Thread類的常用屬性和方法   3.1 守護執行緒:Deamon   3.2 join()方法 4 執行緒間的同步機制   4.1 互斥鎖:Lock   4.2 遞迴鎖:RLock   4.3

Python併發程式設計系列之協程

1 引言 2 協程的意義   2.1 什麼是協程   2.2 協程的作用   2.3 相關概念 3 定義協程 4 使用協程   4.1 單個協程   4.2多協程併發   4.3 獲取返回值   4.4 繫結回撥函式   4.5 協程的巢狀使用 5 總結   1 引言

基於JVM原理JMM模型和CPU快取模型深入理解Java併發程式設計

許多以Java多執行緒開發為主題的技術書籍,都會把對Java虛擬機器和Java記憶體模型的講解,作為講授Java併發程式設計開發的主要內容,有的還深入到計算機系統的記憶體、CPU、快取等予以說明。實際上,在實際的Java開發工作中,僅僅瞭解併發程式設計的建立、啟動、管理和通訊等基本知識還是不夠的。一方面,如果

python併發程式設計2

一、訊號量 # 多程序中的元件 # ktv # 4個 # 一套資源 同一時間 只能被n個人訪問 # 某一段程式碼 同一時間 只能被n個程序執行 from multiprocessing import Process,Semaphore import time import random def

python併發程式設計1

主程序和子程序 執行結果: 一旦程序建立後,程序就由作業系統排程 程式碼解析: 子程序與父程序 所以主程序的父程序就是pycharm args傳參 一個引數 兩個引數 join作用(建立多個執行緒):

Python併發程式設計之常用概念剖析

1 引言   併發、並行、序列、同步、非同步、阻塞、非阻塞、程序、執行緒、協程是併發程式設計中的常見概念,相似卻也有卻不盡相同,令人頭痛,這一篇博文中我們來區分一下這些概念。 2 併發與並行   在解釋併發與並行之前,我們必須先明確:單個處理器(一個單核CPU)在某一個時刻只能處理一個執

Python--併發程式設計(3)

Python--協程 協程介紹 協程:是單執行緒下的併發,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒,即協程是由使用者程式自己控制排程的 需要強調的是: #1. python的執行緒屬於核心級別的,即由作業系統控制排程(如單執行緒遇到i

Python學習【第21篇】:程序池以及回撥函式 python併發程式設計之多程序2-------------資料共享及程序池和回撥函式

python併發程式設計之多程序2-------------資料共享及程序池和回撥函式 一、資料共享 1.程序間的通訊應該儘量避免共享資料的方式 2.程序

Python學習【第20篇】:互斥鎖以及程序之間的三種通訊方式(IPC)以及生產者個消費者模型 python併發程式設計之多程序1-----------互斥鎖與程序間的通訊

python併發程式設計之多程序1-----------互斥鎖與程序間的通訊 一、互斥鎖 程序之間資料隔離,但是共享一套檔案系統,因而可以通過檔案來實現程序直接的通訊,

Python學習【第24篇】:死鎖,遞迴鎖,訊號量,Event事件,執行緒Queue python併發程式設計之多執行緒2------------死鎖與遞迴鎖,訊號量等

python併發程式設計之多執行緒2------------死鎖與遞迴鎖,訊號量等 一、死鎖現象與遞迴鎖 程序也是有死鎖的 所謂死鎖: 是指兩個或兩個以上