1. 程式人生 > >Python幾種並發實現方案的性能比較

Python幾種並發實現方案的性能比較

什麽 _id 虛擬機 並發模型 chm 14. \n 爭奪 系統資源

偶然看到Erlang vs. Stackless python: a first benchmark,對Erlang和Stackless Python的並發處理性能進行了實驗比較,基本結論認為二者有比較相近的性能。我看完產生的問題是,Stackless Python與Python的其他並發實現機制性能又會有多大區別呢,比如線程和進程。因此我采用與這篇文章相同的辦法來對Stackless Python、普通Python的thread模塊、普通Python的threading模塊、普通Python的processing模塊這四種並發實現方案進行了性能實驗,並將實驗過程和基本結果記錄在這裏。

後來看到了基於greenlet實現的高性能網絡框架Eventlet,因而更新了實驗方案,將greenlet也加入了比較,雖然greenlet並非是一種真正意義上的並發處理,而是在單個線程下對程序塊進行切換輪流執行。

(Edit)

2.  實驗方案

實驗方案與Erlang vs. Stackless python: a first benchmark是相同的,用每種方案分別給出如下問題的實現,記錄完成整個處理過程的總時間來作為評判性能的依據:

  1. 由n個節點組成一個環狀網絡,在上面傳送共m個消息。
  2. 將每個消息(共m個),逐個發送給1號節點。
  3. 第1到n-1號節點在接收到消息後,都轉發給下一號節點。
  4. 第n號節點每次收到消息後,不再繼續轉發。
  5. 當m個消息都從1號逐個到達第n號節點時,認為全部處理結束。
(Edit)

2.1  硬件平臺

Macbook Pro 3,1上的Vmware Fusion 1.0虛擬機中,註意這裏給虛擬機只啟用了cpu的單個核心:

  • 原始Cpu:Core 2 Duo,2.4 GHz,2核心,4 MB L2 緩存,總線速度800 MHz
  • 分配給虛擬機的內存:796M

(單個CPU,還能比較並發嗎?)

(Edit)

2.2  軟件平臺

Vmware Fusion 1.0下的Debian etch:

  • 原始Python:Debian發行版自帶Python 2.4.4
  • Python 2.4.4 Stackless 3.1b3 060516
  • processing-0.52-py2.4-linux-i686.egg
  • 原始Python下的greenlet實現:py lib 0.9.2
(Edit)

3.  實驗過程及結果

各方案的實現代碼見後文。實驗時使用time指令記錄每次運行的總時間,選用的都是不做任何輸出的no_io實現(Python的print指令還是挺耗資源的,如果不註釋掉十有八九得影響測試結果),每次執行時設定n=300,m=10000(Erlang vs. Stackless python: a first benchmark文章中認為n可以設置為300,m則可以取10000到90000之間的數值分別進行測試)。

(Edit)

3.1  Stackless Python的實驗結果

real	0m1.651s
user	0m1.628s
sys	0m0.020s

即使將m擴大到30000,實驗結果仍然很突出:

real	0m4.749s
user	0m4.716s
sys	0m0.028s
(Edit)

3.2  使用thread模塊的實驗結果

real	1m13.009s
user	0m2.476s
sys	0m59.028s
(Edit)

3.3  使用threading模塊配合Queue模塊的實驗結果

不太穩定,有時候這樣:

real	1m9.222s
user	0m34.418s
sys	0m34.622s

也有時這樣:

real	2m14.016s
user	0m6.644s
sys	2m7.260s
(Edit)

3.4  使用processing模塊配合Queue模塊的實驗結果

real	3m43.539s
user	0m15.345s
sys	3m27.953s
(Edit)

3.5  greenlet模塊的實驗結果

real	0m9.225s
user	0m0.644s
sys	0m8.581s
(Edit)

3.6  eventlet模塊的實驗結果

註意!eventlet 的這個實驗結果是後來增補的,硬件平臺沒變,但是是直接在 OSX 自帶 Python 2.5 環境下執行出來的,同時系統中還有 Firefox 等很多程序也在爭奪系統資源。因此只能作為大致參考,不能與其他幾組數據作直接對比。(其中 eventlet 的版本是 0.9.5)

real    0m21.610s
user    0m20.713s
sys     0m0.215s
(Edit)

4.  結論與分析

(Edit)

4.1  Stackless Python

毫無疑問,Stackless Python幾乎有匪夷所思的並發性能,比其他方案快上幾十倍,而且借助Stackless Python提供的channel機制,實現也相當簡單。也許這個結果向我們部分揭示了沈仙人基於Stackless Python實現的Eurasia3能夠提供相當於c語言效果的恐怖並發性能的原因。

(Edit)

4.2  Python線程

從道理上來講,thread模塊似乎應該和threading提供基本相同的性能,畢竟threading只是對thread的一種封裝嘛,後臺機制應該是一致的。或許threading由於本身類實例維護方面的開銷,應該會比直接用thread慢一點。從實驗結果來看,二者性能也確實差不多。只是不大明白為何threading方案的測試結果不是很穩定,即使對其他方案的測試運行多次,誤差也不會像threading這麽飄。從代碼實現體驗來說,用threading配合Queue比直接用thread實在是輕松太多了,並且出錯的機會也要少很多。

(Edit)

4.3  Python進程

processing模塊給出的進程方案大致比thread線程要慢一倍,並且這是在我特意調整虛擬機給它預備了足夠空閑內存、避免使用交換分區的情況下取得的(特意分給虛擬機700多M內存就是為了這個)。而其他方案僅僅占用數M內存,完全無需特意調大可用內存總量。當然,如果給虛擬機多啟用幾個核心的話,processing也許會占上點便宜,畢竟目前thread模塊是不能有效利用多cpu資源的(經實驗,Stackless Python在開啟雙核的情況下表現的性能和單核是一樣的,說明也是不能有效利用多cpu)。因此一種比較合理的做法是根據cpu的數量,啟用少量幾個進程,而在進程內部再開啟線程進行實際業務處理,這也是目前Python社區推薦的有效利用多cpu資源的辦法。好在processing配合其自身提供的Queue模塊,編程體驗還是比較輕松的。

(Edit)

4.4  greenlet超輕量級方案

基於greenlet的實現則性能僅次於Stackless Python,大致比Stackless Python慢一倍,比其他方案快接近一個數量級。其實greenlet不是一種真正的並發機制,而是在同一線程內,在不同函數的執行代碼塊之間切換,實施“你運行一會、我運行一會”,並且在進行切換時必須指定何時切換以及切換到哪。greenlet的接口是比較簡單易用的,但是使用greenlet時的思考方式與其他並發方案存在一定區別。線程/進程模型在大邏輯上通常從並發角度開始考慮,把能夠並行處理的並且值得並行處理的任務分離出來,在不同的線程/進程下運行,然後考慮分離過程可能造成哪些互斥、沖突問題,將互斥的資源加鎖保護來保證並發處理的正確性。greenlet則是要求從避免阻塞的角度來進行開發,當出現阻塞時,就顯式切換到另一段沒有被阻塞的代碼段執行,直到原先的阻塞狀況消失以後,再人工切換回原來的代碼段繼續處理。因此,greenlet本質是一種合理安排了的串行,實驗中greenlet方案能夠得到比較好的性能表現,主要也是因為通過合理的代碼執行流程切換,完全避免了死鎖和阻塞等情況(執行帶屏幕輸出的ring_greenlet.py我們會看到腳本總是一個一個地處理消息,把一個消息在環上從頭傳到尾之後,再開始處理下一個消息)。因為greenlet本質是串行,因此在沒有進行顯式切換時,代碼的其他部分是無法被執行到的,如果要避免代碼長時間占用運算資源造成程序假死,那麽還是要將greenlet與線程/進程機制結合使用(每個線程、進程下都可以建立多個greenlet,但是跨線程/進程時greenlet之間無法切換或通訊)。

Stackless則比較特別,對很多資源從底層進行了並發改造,並且提供了channel等更適合“並發”的通訊機制實現,使得資源互斥沖突的可能性大大減小,並發性能自然得以提高。粗糙來講,greenlet是“阻塞了我就先幹點兒別的,但是程序員得明確告訴greenlet能先幹點兒啥以及什麽時候回來”;Stackless則是“東西我已經改造好了,你只要用我的東西,並發沖突就不用操心,只管放心大膽地並發好了”。greenlet應該是學習了Stackless的上下文切換機制,但是對底層資源沒有進行適合並發的改造。並且實際上greenlet也沒有必要改造底層資源的並發性,因為它本質是串行的單線程,不與其他並發模型混合使用的話是無法造成對資源的並發訪問的。

(Edit)

greenlet 封裝後的 eventlet 方案

eventlet 是基於 greenlet 實現的面向網絡應用的並發處理框架,提供“線程”池、隊列等與其他 Python 線程、進程模型非常相似的 api,並且提供了對 Python 發行版自帶庫及其他模塊的超輕量並發適應性調整方法,比直接使用 greenlet 要方便得多。並且這個解決方案源自著名虛擬現實遊戲“第二人生”,可以說是久經考驗的新興並發處理模型。其基本原理是調整 Python 的 socket 調用,當發生阻塞時則切換到其他 greenlet 執行,這樣來保證資源的有效利用。需要註意的是:

  • eventlet 提供的函數只能對 Python 代碼中的 socket 調用進行處理,而不能對模塊的 C 語言部分的 socket 調用進行修改。對後者這類模塊,仍然需要把調用模塊的代碼封裝在 Python 標準線程調用中,之後利用 eventlet 提供的適配器實現 eventlet 與標準線程之間的協作。
  • 再有,雖然 eventlet 把 api 封裝成了非常類似標準線程庫的形式,但兩者的實際並發執行流程仍然有明顯區別。在沒有出現 I/O 阻塞時,除非顯式聲明,否則當前正在執行的 eventlet 永遠不會把 cpu 交給其他的 eventlet,而標準線程則是無論是否出現阻塞,總是由所有線程一起爭奪運行資源。所有 eventlet 對 I/O 阻塞無關的大運算量耗時操作基本沒有什麽幫助。

在性能測試結果方面,eventlet 消耗的運行時間大致是 greenlet 方案的 3 到 5 倍,而 Python 標準線程模型的 thread 方式消耗的運行時間大致是 eventlet 測試代碼的 8 到 10 倍。其中前者可能是因為我們在 eventlet 的測試代碼中,使用隊列機制來完成所有的消息傳遞,而隊列上的訪問互斥保護可能額外消耗了一些運算資源。總體而言,eventlet 模型的並發性能雖然比 Stackless Python 和直接使用 greenlet 有一定差距,但仍然比標準線程模型有大約一個數量級的優勢,這也就不奇怪近期很多強調並發性能的網絡服務器實現采取 eventlet 、線程、進程三者組合使用的實現方案。

(Edit)

5.  實驗代碼

實驗代碼下載:

  • 版本3 下載:增加了 eventlet 方案的實驗代碼。
  • 版本2 下載:增加了 greenlet 方案的實驗代碼。
  • 版本1 下載:包括 Stackless Python 、 thread 、 threading 、 processing 四種方案的實驗代碼。

為方便閱讀,將實驗中用到的幾個腳本的代碼粘貼如下,其中Stackless Python方案的代碼實現直接取自Erlang vs. Stackless python: a first benchmark:

(Edit)

5.1  ring_no_io_slp.py

  1. #!/Library/Frameworks/Python.framework/Versions/2.5/bin/python
  2. # encoding: utf-8
  3. import sys
  4. import stackless as SL
  5. def run_benchmark(n, m):
  6. # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
  7. firstP = cin = SL.channel()
  8. for s in xrange(1, n):
  9. seqn = s
  10. cout = SL.channel()
  11. # # print("*> s = %d" % (seqn, ))
  12. t = SL.tasklet(loop)(seqn, cin, cout)
  13. cin = cout
  14. else:
  15. seqn = s+1
  16. # # print("$> s = %d" % (seqn, ))
  17. t = SL.tasklet(mloop)(seqn, cin)
  18. for r in xrange(m-1, -1, -1):
  19. # # print("+ sending Msg# %d" % r)
  20. firstP.send(r)
  21. SL.schedule()
  22. def loop(s, cin, cout):
  23. while True:
  24. r = cin.receive()
  25. cout.send(r)
  26. if r > 0:
  27. # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
  28. pass
  29. else:
  30. # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s))
  31. break
  32. def mloop(s, cin):
  33. while True:
  34. r = cin.receive()
  35. if r > 0:
  36. # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
  37. pass
  38. else:
  39. # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s))
  40. break
  41. def pid(): return repr(SL.getcurrent()).split()[-1][2:-1]
  42. if __name__ == ‘__main__‘:
  43. run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
[$[Get Code]] (Edit)

5.2  ring_no_io_thread.py

  1. #!/Library/Frameworks/Python.framework/Versions/2.5/bin/python
  2. # encoding: utf-8
  3. import sys, time
  4. import thread
  5. SLEEP_TIME = 0.0001
  6. def run_benchmark(n, m):
  7. # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
  8. locks = [thread.allocate_lock() for i in xrange(n)]
  9. firstP = cin = []
  10. cin_lock_id = 0
  11. for s in xrange(1, n):
  12. seqn = s
  13. cout = []
  14. cout_lock_id = s
  15. # print("*> s = %d" % (seqn, ))
  16. thread.start_new_thread(loop, (seqn, locks, cin, cin_lock_id, cout, cout_lock_id))
  17. cin = cout
  18. cin_lock_id = cout_lock_id
  19. else:
  20. seqn = s+1
  21. # print("$> s = %d" % (seqn, ))
  22. thread.start_new_thread(mloop, (seqn, locks, cin, cin_lock_id))
  23. for r in xrange(m-1, -1, -1):
  24. # print("+ sending Msg# %d" % r)
  25. lock = locks[0]
  26. lock.acquire()
  27. firstP.append(r)
  28. lock.release()
  29. time.sleep(SLEEP_TIME)
  30. try:
  31. while True:
  32. time.sleep(SLEEP_TIME)
  33. except:
  34. pass
  35. def loop(s, locks, cin, cin_lock_id, cout, cout_lock_id):
  36. while True:
  37. lock = locks[cin_lock_id]
  38. lock.acquire()
  39. if len(cin) > 0:
  40. r = cin.pop(0)
  41. lock.release()
  42. else:
  43. lock.release()
  44. time.sleep(SLEEP_TIME)
  45. continue
  46. lock = locks[cout_lock_id]
  47. lock.acquire()
  48. cout.append(r)
  49. lock.release()
  50. if r > 0:
  51. # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
  52. pass
  53. else:
  54. # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s))
  55. break
  56. def mloop(s, locks, cin, cin_lock_id):
  57. while True:
  58. lock = locks[cin_lock_id]
  59. lock.acquire()
  60. if len(cin) > 0:
  61. r = cin.pop(0)
  62. lock.release()
  63. else:
  64. lock.release()
  65. time.sleep(SLEEP_TIME)
  66. continue
  67. if r > 0:
  68. # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
  69. pass
  70. else:
  71. # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s))
  72. break
  73. thread.interrupt_main()
  74. def pid(): return thread.get_ident()
  75. if __name__ == ‘__main__‘:
  76. run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
[$[Get Code]] (Edit)

5.3  ring_no_io_queue.py

  1. #!/Library/Frameworks/Python.framework/Versions/2.5/bin/python
  2. # encoding: utf-8
  3. import sys
  4. import threading, Queue
  5. def run_benchmark(n, m):
  6. # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
  7. firstP = cin = Queue.Queue()
  8. for s in xrange(1, n):
  9. seqn = s
  10. cout = Queue.Queue()
  11. # print("*> s = %d" % (seqn, ))
  12. t = Loop(seqn, cin, cout)
  13. t.setDaemon(False)
  14. t.start()
  15. cin = cout
  16. else:
  17. seqn = s+1
  18. # print("$> s = %d" % (seqn, ))
  19. t = MLoop(seqn, cin)
  20. t.setDaemon(False)
  21. t.start()
  22. for r in xrange(m-1, -1, -1):
  23. # print("+ sending Msg# %d" % r)
  24. firstP.put(r)
  25. class Loop(threading.Thread):
  26. def __init__(self, s, cin, cout):
  27. threading.Thread.__init__(self)
  28. self.cin = cin
  29. self.cout = cout
  30. self.s = s
  31. def run(self):
  32. while True:
  33. r = self.cin.get()
  34. self.cout.put(r)
  35. if r > 0:
  36. # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), self.s, r))
  37. pass
  38. else:
  39. # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), self.s))
  40. break
  41. class MLoop(threading.Thread):
  42. def __init__(self, s, cin):
  43. threading.Thread.__init__(self)
  44. self.cin = cin
  45. self.s = s
  46. def run(self):
  47. while True:
  48. r = self.cin.get()
  49. if r > 0:
  50. # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), self.s, r))
  51. pass
  52. else:
  53. # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), self.s))
  54. break
  55. def pid(): return threading.currentThread()
  56. if __name__ == ‘__main__‘:
  57. run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
[$[Get Code]] (Edit)

5.4  ring_no_io_proc.py

  1. #!/Library/Frameworks/Python.framework/Versions/2.5/bin/python
  2. # encoding: utf-8
  3. import sys
  4. import processing, Queue
  5. def run_benchmark(n, m):
  6. # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
  7. firstP = cin = processing.Queue()
  8. for s in xrange(1, n):
  9. seqn = s
  10. cout = processing.Queue()
  11. # print("*> s = %d" % (seqn, ))
  12. p = processing.Process(target = loop, args = [seqn, cin, cout])
  13. p.start()
  14. cin = cout
  15. else:
  16. seqn = s+1
  17. # print("$> s = %d" % (seqn, ))
  18. p = processing.Process(target = mloop, args = [seqn, cin])
  19. p.start()
  20. for r in xrange(m-1, -1, -1):
  21. # print("+ sending Msg# %d" % r)
  22. firstP.put(r)
  23. p.join()
  24. def loop(s, cin, cout):
  25. while True:
  26. r = cin.get()
  27. cout.put(r)
  28. if r > 0:
  29. # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
  30. pass
  31. else:
  32. # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s))
  33. break
  34. def mloop(s, cin):
  35. while True:
  36. r = cin.get()
  37. if r > 0:
  38. # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
  39. pass
  40. else:
  41. # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s))
  42. break
  43. def pid(): return processing.currentProcess()
  44. if __name__ == ‘__main__‘:
  45. run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
[$[Get Code]] (Edit)

5.5  ring_no_io_greenlet.py

  1. #!/Library/Frameworks/Python.framework/Versions/2.5/bin/python
  2. # encoding: utf-8
  3. import sys
  4. from py.magic import greenlet
  5. def run_benchmark(n, m):
  6. # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
  7. glets = [greenlet.getcurrent()]
  8. for s in xrange(1, n):
  9. seqn = s
  10. glets.append(greenlet(loop))
  11. # print("*> s = %d" % (seqn, ))
  12. else:
  13. seqn = s+1
  14. glets.append(greenlet(mloop))
  15. # print("$> s = %d" % (seqn, ))
  16. glets[-1].switch(seqn, glets)
  17. for r in xrange(m-1, -1, -1):
  18. # print("+ sending Msg# %d" % r)
  19. glets[1].switch(r)
  20. def loop(s, glets):
  21. previous = glets[s - 1]
  22. next = glets[s + 1]
  23. if s > 1:
  24. r = previous.switch(s - 1, glets)
  25. else:
  26. r = previous.switch()
  27. while True:
  28. if r > 0:
  29. # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid("loop", s), s, r))
  30. pass
  31. else:
  32. # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid("loop", s), s))
  33. break
  34. next.switch(r)
  35. r = previous.switch()
  36. next.switch(r)
  37. def mloop(s, glets):
  38. previous = glets[s - 1]
  39. r = previous.switch(s - 1, glets)
  40. while True:
  41. if r > 0:
  42. # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid("mloop", s), s, r))
  43. pass
  44. else:
  45. # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid("mloop", s), s))
  46. break
  47. r = previous.switch()
  48. def pid(func, s): return "<<%s(Greenlet-%d, started)>>" % (func, s)
  49. if __name__ == ‘__main__‘:
  50. run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
[$[Get Code]] (Edit)

5.6  ring_no_io_eventlet.py

  1. #!/Library/Frameworks/Python.framework/Versions/2.5/bin/python
  2. # encoding: utf-8
  3. import sys
  4. import eventlet
  5. def run_benchmark(n, m):
  6. # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
  7. firstP = cin = eventlet.Queue()
  8. for s in xrange(1, n):
  9. seqn = s
  10. cout = eventlet.Queue()
  11. # print("*> s = %d" % (seqn, ))
  12. eventlet.spawn_n(loop, seqn, cin, cout)
  13. cin = cout
  14. else:
  15. seqn = s+1
  16. # print("$> s = %d" % (seqn, ))
  17. for r in xrange(m-1, -1, -1):
  18. # print("+ sending Msg# %d" % r)
  19. firstP.put(r)
  20. mloop(seqn, cin)
  21. def loop(s, cin, cout):
  22. while True:
  23. r = cin.get()
  24. cout.put(r)
  25. if r > 0:
  26. # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
  27. pass
  28. else:
  29. # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s))
  30. break
  31. def mloop(s, cin):
  32. while True:
  33. r = cin.get()
  34. if r > 0:
  35. # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
  36. pass
  37. else:
  38. # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s))
  39. break
  40. def pid(): return eventlet.greenthread.getcurrent()
  41. if __name__ == ‘__main__‘:
  42. run_benchmark(int(sys.argv[1]), int(sys.argv[2]))

Python幾種並發實現方案的性能比較