1. 程式人生 > >python 多線程糗事百科案例

python 多線程糗事百科案例

wow64 案例 sts ascii starting 頁面 don 示意圖 utf-8

案例要求參考上一個糗事百科單進程案例

Queue(隊列對象)

Queue是python中的標準庫,可以直接import Queue引用;隊列是線程間最常用的交換數據的形式

python下多線程的思考

對於資源,加鎖是個重要的環節。因為python原生的list,dict等,都是not thread safe的。而Queue,是線程安全的,因此在滿足使用條件下,建議使用隊列

  1. 初始化: class Queue.Queue(maxsize) FIFO 先進先出

  2. 包中的常用方法:

    • Queue.qsize() 返回隊列的大小

    • Queue.empty() 如果隊列為空,返回True,反之False

    • Queue.full() 如果隊列滿了,返回True,反之False

    • Queue.full 與 maxsize 大小對應

    • Queue.get([block[, timeout]])獲取隊列,timeout等待時間

  3. 創建一個“隊列”對象

    • import Queue
    • myqueue = Queue.Queue(maxsize = 10)
  4. 將一個值放入隊列中

    • myqueue.put(10)
  5. 將一個值從隊列中取出

    • myqueue.get()

多線程示意圖

技術分享

# -*- coding:utf-8 -*-
import requests
from lxml import etree
from Queue import Queue
import threading
import time
import json


class thread_crawl(threading.Thread):
    ‘‘‘
    抓取線程類
    ‘‘‘

    def __init__(self, threadID, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.q = q

    def run(self):
        print "Starting " + self.threadID
        self.qiushi_spider()
        print "Exiting ", self.threadID

    def qiushi_spider(self):
        # page = 1
        while True:
            if self.q.empty():
                break
            else:
                page = self.q.get()
                print ‘qiushi_spider=‘, self.threadID, ‘,page=‘, str(page)
                url = ‘http://www.qiushibaike.com/8hr/page/‘ + str(page) + ‘/‘
                headers = {
                    ‘User-Agent‘: ‘Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36‘,
                    ‘Accept-Language‘: ‘zh-CN,zh;q=0.8‘}
                # 多次嘗試失敗結束、防止死循環
                timeout = 4
                while timeout > 0:
                    timeout -= 1
                    try:
                        content = requests.get(url, headers=headers)
                        data_queue.put(content.text)
                        break
                    except Exception, e:
                        print ‘qiushi_spider‘, e
                if timeout < 0:
                    print ‘timeout‘, url


class Thread_Parser(threading.Thread):
    ‘‘‘
    頁面解析類;
    ‘‘‘

    def __init__(self, threadID, queue, lock, f):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.queue = queue
        self.lock = lock
        self.f = f

    def run(self):
        print ‘starting ‘, self.threadID
        global total, exitFlag_Parser
        while not exitFlag_Parser:
            try:
                ‘‘‘
                調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數為block,默認為True。
                如果隊列為空且block為True,get()就使調用線程暫停,直至有項目可用。
                如果隊列為空且block為False,隊列將引發Empty異常。
                ‘‘‘
                item = self.queue.get(False)
                if not item:
                    pass
                self.parse_data(item)
                self.queue.task_done()
                print ‘Thread_Parser=‘, self.threadID, ‘,total=‘, total
            except:
                pass
        print ‘Exiting ‘, self.threadID

    def parse_data(self, item):
        ‘‘‘
        解析網頁函數
        :param item: 網頁內容
        :return:
        ‘‘‘
        global total
        try:
            html = etree.HTML(item)
            result = html.xpath(‘//div[contains(@id,"qiushi_tag")]‘)
            for site in result:
                try:
                    imgUrl = site.xpath([email protected]
/* */)[0] title = site.xpath(‘.//h2‘)[0].text content = site.xpath(‘.//div[@class="content"]/span‘)[0].text.strip() vote = None comments = None try: vote = site.xpath(‘.//i‘)[0].text comments = site.xpath(‘.//i‘)[1].text except: pass result = { ‘imgUrl‘: imgUrl, ‘title‘: title, ‘content‘: content, ‘vote‘: vote, ‘comments‘: comments, } with self.lock: # print ‘write %s‘ % json.dumps(result) self.f.write(json.dumps(result, ensure_ascii=False).encode(‘utf-8‘) + "\n") except Exception, e: print ‘site in result‘, e except Exception, e: print ‘parse_data‘, e with self.lock: total += 1 data_queue = Queue() exitFlag_Parser = False lock = threading.Lock() total = 0 def main(): output = open(‘qiushibaike.json‘, ‘a‘) #初始化網頁頁碼page從1-10個頁面 pageQueue = Queue(50) for page in range(1, 11): pageQueue.put(page) #初始化采集線程 crawlthreads = [] crawlList = ["crawl-1", "crawl-2", "crawl-3"] for threadID in crawlList: thread = thread_crawl(threadID, pageQueue) thread.start() crawlthreads.append(thread) #初始化解析線程parserList parserthreads = [] parserList = ["parser-1", "parser-2", "parser-3"] #分別啟動parserList for threadID in parserList: thread = Thread_Parser(threadID, data_queue, lock, output) thread.start() parserthreads.append(thread) # 等待隊列清空 while not pageQueue.empty(): pass # 等待所有線程完成 for t in crawlthreads: t.join() while not data_queue.empty(): pass # 通知線程是時候退出 global exitFlag_Parser exitFlag_Parser = True for t in parserthreads: t.join() print "Exiting Main Thread" with lock: output.close() if __name__ == ‘__main__‘: main()

python 多線程糗事百科案例