Queue(佇列物件)多執行緒
阿新 • • 發佈:2019-01-06
Queue是python中的標準庫,可以直接import Queue引用;佇列是執行緒間最常用的交換資料的形式
python下多執行緒的思考
對於資源,加鎖是個重要的環節。因為python原生的list,dict等,都是not thread safe的。而Queue,是執行緒安全的,因此在滿足使用條件下,建議使用佇列
初始化: class Queue.Queue(maxsize) FIFO 先進先出
包中的常用方法:
Queue.qsize() 返回佇列的大小
Queue.empty() 如果佇列為空,返回True,反之False
Queue.full() 如果佇列滿了,返回True,反之False
Queue.full 與 maxsize 大小對應
Queue.get([block[, timeout]])獲取佇列,timeout等待時間
建立一個“佇列”物件
- import Queue
- myqueue = Queue.Queue(maxsize = 10)
將一個值放入佇列中
- myqueue.put(10)
將一個值從佇列中取出
- myqueue.get()
多執行緒示意圖
# -*- coding:utf-8 -*-
import requests
from lxml import etree
from Queue import Queue
import threading
import time
import json
class thread_crawl (threading.Thread):
'''
抓取執行緒類
'''
def __init__(self, threadID, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.q = q
def run(self):
print "Starting " + self.threadID
self.qiushi_spider()
print "Exiting ", self.threadID
def qiushi_spider(self):
# page = 1
while True:
if self.q.empty():
break
else:
page = self.q.get()
print 'qiushi_spider=', self.threadID, ',page=', str(page)
url = 'http://www.qiushibaike.com/8hr/page/' + str(page) + '/'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36',
'Accept-Language': 'zh-CN,zh;q=0.8'}
# 多次嘗試失敗結束、防止死迴圈
timeout = 4
while timeout > 0:
timeout -= 1
try:
content = requests.get(url, headers=headers)
data_queue.put(content.text)
break
except Exception, e:
print 'qiushi_spider', e
if timeout < 0:
print 'timeout', url
class Thread_Parser(threading.Thread):
'''
頁面解析類;
'''
def __init__(self, threadID, queue, lock, f):
threading.Thread.__init__(self)
self.threadID = threadID
self.queue = queue
self.lock = lock
self.f = f
def run(self):
print 'starting ', self.threadID
global total, exitFlag_Parser
while not exitFlag_Parser:
try:
'''
呼叫佇列物件的get()方法從隊頭刪除並返回一個專案。可選引數為block,預設為True。
如果佇列為空且block為True,get()就使呼叫執行緒暫停,直至有專案可用。
如果佇列為空且block為False,佇列將引發Empty異常。
'''
item = self.queue.get(False)
if not item:
pass
self.parse_data(item)
self.queue.task_done()
print 'Thread_Parser=', self.threadID, ',total=', total
except:
pass
print 'Exiting ', self.threadID
def parse_data(self, item):
'''
解析網頁函式
:param item: 網頁內容
:return:
'''
global total
try:
html = etree.HTML(item)
result = html.xpath('//div[contains(@id,"qiushi_tag")]')
for site in result:
try:
imgUrl = site.xpath('.//img/@src')[0]
title = site.xpath('.//h2')[0].text
content = site.xpath('.//div[@class="content"]/span')[0].text.strip()
vote = None
comments = None
try:
vote = site.xpath('.//i')[0].text
comments = site.xpath('.//i')[1].text
except:
pass
result = {
'imgUrl': imgUrl,
'title': title,
'content': content,
'vote': vote,
'comments': comments,
}
with self.lock:
# print 'write %s' % json.dumps(result)
self.f.write(json.dumps(result, ensure_ascii=False).encode('utf-8') + "\n")
except Exception, e:
print 'site in result', e
except Exception, e:
print 'parse_data', e
with self.lock:
total += 1
data_queue = Queue()
exitFlag_Parser = False
lock = threading.Lock()
total = 0
def main():
output = open('qiushibaike.json', 'a')
#初始化網頁頁碼page從1-10個頁面
pageQueue = Queue(50)
for page in range(1, 11):
pageQueue.put(page)
#初始化採集執行緒
crawlthreads = []
crawlList = ["crawl-1", "crawl-2", "crawl-3"]
for threadID in crawlList:
thread = thread_crawl(threadID, pageQueue)
thread.start()
crawlthreads.append(thread)
#初始化解析執行緒parserList
parserthreads = []
parserList = ["parser-1", "parser-2", "parser-3"]
#分別啟動parserList
for threadID in parserList:
thread = Thread_Parser(threadID, data_queue, lock, output)
thread.start()
parserthreads.append(thread)
# 等待佇列清空
while not pageQueue.empty():
pass
# 等待所有執行緒完成
for t in crawlthreads:
t.join()
while not data_queue.empty():
pass
# 通知執行緒是時候退出
global exitFlag_Parser
exitFlag_Parser = True
for t in parserthreads:
t.join()
print "Exiting Main Thread"
with lock:
output.close()
if __name__ == '__main__':
main()