1. 程式人生 > >python執行緒池實現網路爬蟲

python執行緒池實現網路爬蟲

http://blog.daviesliu.net/2006/10/09/234822/

首先是建立執行緒池:

執行緒池主要由兩個佇列維護,執行緒佇列和任務佇列,執行緒佇列存放開啟的執行緒,任務佇列由使用者新增任務,開啟的執行緒一直去任務佇列中獲取任務

import Queue, threading, sys  
from threading import Thread  
import time  
import urllib  
   
# working thread  
class Worker(Thread):  
    worker_count = 0  
    timeout = 1  
    def __init__( self, workQueue, resultQueue, **kwds):  
        Thread.__init__( self, **kwds )  
        self.id = Worker.worker_count  
        Worker.worker_count += 1  
        self.setDaemon( True )  
        self.workQueue = workQueue  
        self.resultQueue = resultQueue  
        self.start( )  
   
    def run( self ):  
        ''''' the get-some-work, do-some-work main loop of worker threads '''  
        while True:  
            try:  
                callable, args, kwds = self.workQueue.get(timeout=Worker.timeout)  
                res = callable(*args, **kwds)  
                print "worker[%2d]: %s" % (self.id, str(res) )  
                self.resultQueue.put( res )  
                #time.sleep(Worker.sleep)  
            except Queue.Empty:  
                break  
            except :  
                print 'worker[%2d]' % self.id, sys.exc_info()[:2]  
                raise  
                 
class WorkerManager:  
    def __init__( self, num_of_workers=10, timeout = 2):  
        self.workQueue = Queue.Queue()  
        self.resultQueue = Queue.Queue()  
        self.workers = []  
        self.timeout = timeout  
        self._recruitThreads( num_of_workers )  
   
    def _recruitThreads( self, num_of_workers ):  
        for i in range( num_of_workers ):  
            worker = Worker( self.workQueue, self.resultQueue )  
            self.workers.append(worker)  
   
    def wait_for_complete( self):  
        # ...then, wait for each of them to terminate:  
        while len(self.workers):  
            worker = self.workers.pop()  
            worker.join( )  
            if worker.isAlive() and not self.workQueue.empty():  
                self.workers.append( worker )  
        print "All jobs are are completed."  
   
    def add_job( self, callable, *args, **kwds ):  
        self.workQueue.put( (callable, args, kwds) )  
   
    def get_result( self, *args, **kwds ):  
        return self.resultQueue.get( *args, **kwds )  

下面是執行緒池的使用:
import urllib2  
import time  
import socket  
from datetime import datetime  
from thread_pool import *  
   
def main():  
    url_list = {"sina":"http://www.sina.com.cn",  
               "sohu":"http://www.sohu.com",  
               "yahoo":"http://www.yahoo.com",  
               "xiaonei":"http://www.xiaonei.com",  
               "qihoo":"http://www.qihoo.com",  
               "laohan":"http://www.laohan.org",  
               "eyou":"http://www.eyou.com",  
               "chinaren":"http://www.chinaren.com",  
               "douban":"http://www.douban.com",  
               "163":"http://www.163.com",  
               "daqi":"http://www.daqi.com",  
               "qq":"http://www.qq.com",  
               "baidu_1":"http://www.baidu.com/s?wd=asdfasdf",  
               "baidu_2":"http://www.baidu.com/s?wd=dddddddf",  
               "google_1":"http://www.baidu.com/s?wd=sadfas",  
               "google_2":"http://www.baidu.com/s?wd=sadflasd",  
               "hainei":"http://www.hainei.com",  
               "microsoft":"http://www.microsoft.com",  
               "wlzuojia":"http://www.wlzuojia.com"}  
   
    socket.setdefaulttimeout(10)  
    print 'start testing'  
    wm = WorkerManager(50)  
    for url_name in url_list.keys():  
        wm.add_job(do_get_con, url_name, url_list[url_name])  
    wm.wait_for_complete()  
    print 'end testing'  
   
def do_get_con(url_name,url_link):  
    try:  
        fd = urllib2.urlopen(url_link)  
        data = fd.read()  
        f_hand = open("/tmp/ttt/%s" % url_name,"w")  
        f_hand.write(data)  
        f_hand.close()  
    except Exception,e:  
        pass  
   
if __name__ == "__main__":  
    main()