1. 程式人生 > >python高性能代碼之多線程優化

python高性能代碼之多線程優化

pty star try div sub cal erro host ble

以常見的端口掃描器為實例

端口掃描器的原理很簡單,操作socket來判斷連接狀態確定主機端口的開放情況。

技術分享圖片
import socket 
def scan(port): 
  s = socket.socket() 
  if s.connect_ex((‘localhost‘, port)) == 0: 
    print port, ‘open‘ 
  s.close() 
if __name__ == ‘__main__‘: 
  map(scan,range(1,65536)) 
技術分享圖片

這是一個socket掃描器的基本代碼。

但是如果直接運行會等待很長時間都沒有反應,這是因為socket是阻塞的,到等待每個連接超時後才會進入下一個連接。

給這段代碼加一個超時

s.settimeout(0.1)

完整的代碼如下

技術分享圖片
import socket 
def scan(port): 
  s = socket.socket() 
  s = settimeont(0.1)
  if s.connect_ex((‘localhost‘, port)) == 0: 
    print port, ‘open‘ 
  s.close() 
if __name__ == ‘__main__‘: 
  map(scan,range(1,65536)) 
技術分享圖片

本文的重點不在於掃描器功能部分。而重點在於代碼質量的提升和優化從而提升代碼的運行效率。

多線程版本:

技術分享圖片
import socket 
import threading 
def scan(port): 
  s = socket.socket() 
  s.settimeout(0.1) 
  if s.connect_ex((‘localhost‘, port)) == 0: 
    print port, ‘open‘ 
  s.close() 
 
if __name__ == ‘__main__‘: 
  threads = [threading.Thread(target=scan, args=(i,)) for i in xrange(1,65536)] 
  map(lambda x:x.start(),threads) 
技術分享圖片

Run起來,速度確實快了不少,但是拋出了異常:thread.error: can‘t start new thread

這個進程開啟了65535個線程,有兩種可能,一種是超過最大線程數了,一種是超過最大socket句柄數了。在linux可以通過ulimit來修改。
如果不修改最大限制,怎麽用多線程不報錯呢?
加個queue,變成生產者-消費者模式,開固定線程。

多線程+隊列版本:

技術分享圖片
import socket 
import threading 
from Queue import Queue 
def scan(port): 
  s = socket.socket() 
  s.settimeout(0.1) 
  if s.connect_ex((‘localhost‘, port)) == 0: 
    print port, ‘open‘ 
  s.close() 
 
def worker(): 
  while not q.empty(): 
    port = q.get() 
    try: 
      scan(port) 
    finally: 
      q.task_done() 
 
if __name__ == ‘__main__‘: 
  q = Queue() 
  map(q.put,xrange(1,65535)) 
  threads = [threading.Thread(target=worker) for i in xrange(500)] 
  map(lambda x:x.start(),threads) 
  q.join() 
技術分享圖片

開500個線程,不停的從隊列中取出任務來進行...

multiprocessing + 隊列版本:

總不能開65535個進程吧?還是用生產者消費者模式

技術分享圖片
import socket 
import multiprocessing def scan(port): s = socket.socket() s.settimeout(0.1) if s.connect_ex((‘localhost‘, port)) == 0: print port, ‘open‘ s.close() def worker(q): while not q.empty(): port = q.get() try: scan(port) finally: q.task_done() if __name__ == ‘__main__‘: q = multiprocessing.JoinableQueue() map(q.put,xrange(1,65535)) jobs = [multiprocessing.Process(target=worker, args=(q,)) for i in xrange(100)] map(lambda x:x.start(),jobs)
技術分享圖片

註意這裏把隊列作為一個參數傳入到worker中去,因為是process safe的queue,不然會報錯。
還有用的是JoinableQueue(),顧名思義就是可以join()的。

gevent的spawn版本:

技術分享圖片
from gevent import monkey; monkey.patch_all(); 
import gevent 
import socket 
... 
if __name__ == ‘__main__‘: 
  threads = [gevent.spawn(scan, i) for i in xrange(1,65536)] 
  gevent.joinall(threads) 
技術分享圖片

註意monkey patch必須在被patch的東西之前import,不然會Exception KeyError.比如不能先import threading,再monkey patch.

gevent的Pool版本:

技術分享圖片
from gevent import monkey; monkey.patch_all(); 
import socket 
from gevent.pool import Pool 
... 
if __name__ == ‘__main__‘: 
  pool = Pool(500) 
  pool.map(scan,xrange(1,65536)) 
  pool.join() 
技術分享圖片

concurrent.futures版本:

技術分享圖片
import socket 
from Queue import Queue 
from concurrent.futures import ThreadPoolExecutor 
... 
if __name__ == ‘__main__‘: 
  q = Queue() 
  map(q.put,xrange(1,65536)) 
  with ThreadPoolExecutor(max_workers=500) as executor: 
    for i in range(500): 
      executor.submit(worker,q) 
技術分享圖片

python高性能代碼之多線程優化