1. 程式人生 > >Python使用多執行緒(附:爬蟲使用的執行緒池)

Python使用多執行緒(附:爬蟲使用的執行緒池)

python開啟多執行緒。

使用的庫:
    python 3.+ :threading(較高階,常用),   _thread(python2.+中叫 thread)(偏底層)
    python 2.+ :thread

實現多執行緒:(python3.6,使用 threading 庫)
   1:函式實現

 #第一種:通過函式建立執行緒
 def 函式a():
     pass

# 獲得一個執行緒物件。
 t = threading.Thread(target=函式a的名字,name=自己隨便取的執行緒名字,args=(引數1,...))   

# 啟動執行緒 
 t.start();


   2:繼承執行緒類(類實現)

class Fetcher(threading.Thread):
        def __init__(self):
            Thread.__init__(self):

            #加這一步的目的是保證,主執行緒退出後子執行緒也會跟著中斷退出
            self.daemon = True
    # 必須要寫
        def run(self):
            #執行緒執行的函式
            pass

# 獲得物件,並啟動執行緒
t = Fetcher()
t.start()

鎖:

執行緒同時操作一個全域性變數時會產生執行緒競爭所以,需要鎖來避免競爭。
實現鎖:

 lock = threading.Lock()
 
 lock.acquire()      #獲得鎖,加鎖

 #..操作全域性變數..(寫檔案,寫入資料庫等)

 lock.release()      #釋放鎖

佇列:(執行緒池中使用)

多執行緒同步就是多個執行緒競爭一個全域性變數時按順序讀寫,一般情況下要用鎖,但是使用標準庫裡的Queue的時候它內部已經實現了鎖,不用程式設計師自己寫了。

使用佇列(只介紹線上程池中的常用方法):

# 匯入佇列類:

from queue import Queue

# 建立一個佇列:

q = Queue(maxsize=0)   # maxsize為佇列大小,為0預設佇列大小可無窮大。

# 佇列是先進先出的資料結構:

q.put(item) #往佇列新增一個item,佇列滿了則阻塞
q.get(item) #從佇列得到一個item,佇列為空則阻塞

還有相應的不等待的版本,這裡略過。

佇列不為空,或者為空但是取得item的執行緒沒有告知任務完成時都是處於阻塞狀態

q.join()    #阻塞直到所有任務完成

# 執行緒告知任務當前任務已結束,可以開始下次任務使用 task_done() 函式

q.task_done() #線上程內呼叫

==============================實現簡單執行緒池(爬蟲使用)==================================
1:建立一個執行緒類:

class TestThread(threading.Thread):
    def __init__(self,task,lock):
        threading.Thread.__init__(self);
        self.task = task ;  # 一個佇列,用來存放要爬取的url
        self.lock = lock ; # 一個鎖物件,用來保護存入資料庫等的操作
        self.daemon = True; # 當主執行緒退出時,子執行緒也退出。
        self.start(); # 運行當前執行緒。
    
    # 執行緒中的主執行方法
    def run(self):
        while True:
            # 從佇列中獲取url,若沒有則一直阻塞,直到獲取到一個url。
            url = self.task.get(); 
            """
            這裡進行爬資料的操作
            """
            # 向任務稟報當前任務執行結束,可以開始下一任務
            self.task.task_done();

2:建立一個執行緒池類

class ThreadPool():
    def __init__(self,thread_num,lock):
        self.task = queue.Queue(); # 生成一個佇列,用來儲存要爬取的連線
        self.lock = lock; # 一個鎖物件,用來保護寫入檔案

    # 根據傳入的執行緒數目,建立對應個執行緒
        for i in range(thread_num):
            TestThread(self.task,lock);

    def add_task(self,url):
        # 往佇列中放url,直到佇列滿則阻塞。
        self.task.put(url)

    def wait_complite(self):
        # 阻塞佇列中的所有執行緒,直到主執行緒執行完畢
        self.task.join();

 
3:使用

if __name__ == "__main__":
    print("開始於:",start)
        lock = threading.Lock();
        pool = ThreadPool(2,lock);
        for i in range(21):
            pool.add_task(i)
        pool.wait_complite();
        print("主執行緒結束");
        print("共用時:",time.time()-start)