1. 程式人生 > >spider----利用多執行緒爬取51job案例

spider----利用多執行緒爬取51job案例

程式碼如下

import json
from threading import Thread
from threading import Lock
from queue import Queue

import requests
from bs4 import BeautifulSoup
import time

# 設定兩個全域性變數,為了控制資料列隊狀態
g_crawl = True
g_parse = True


class CrawThread(Thread):
    def __init__(self, name, page_quue, data_queue):
        super().__init__()
        self.name = name
        self.page_quue = page_quue
        self.url = 'https://search.51job.com/list/170200,000000,0000,00,9,99,c,2,{}.html'
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36'
        }
        self.data_queue = data_queue

    def run(self):
        print('---執行緒%s---正在啟動' % self.name)
        while g_crawl:
            try:
                page = self.page_quue.get(False)
                url = self.url.format(page)
                r = requests.get(url=url, headers=self.headers)
                self.data_queue.put(r.content)
                time.sleep(1)
            except Exception as e:
                pass
        print('---執行緒%s---結束---' % self.name)


class ParseThread(Thread):
    def __init__(self, name, data_queue, fp, lock, page_queue):
        super().__init__()
        self.name = name
        self.data_queue = data_queue
        self.fp = fp
        self.lock = lock
        self.page_queue = page_queue

    def run(self):
        print('---%s---正在啟動' % self.name)
        while g_parse:
            try:
                data = self.data_queue.get(False)
                # 建立一個解析函式
                self.parse(data)
                time.sleep(1)
            except Exception as e:
                pass
        print('---解析執行緒%s---結束---' % self.name)

    def parse(self, data):
        soup = BeautifulSoup(data, 'lxml')
        rets = soup.select('#resultList > .el')[1:]
        # 得到的是一個列表物件,所以進行遍歷處理
        for ret in rets:
            # 得到的是一個列表,取出,並且取值去換行,去空格
            # 職位
            title = ret.select('.t1 a')[0].string.replace('\n', '').strip()
            # 公司
            job_name = ret.select('.t2 a')[0].string.replace('\n', '').strip()
            #
            job_where = ret.select('.t3')[0].string
            # 薪資
            salary = ret.select('.t4')[0].string
            # 釋出時間
            pulish_time = ret.select('.t5')[0].string
            item = {
                '職位': title,
                '公司': job_name,
                '地點': job_where,
                '薪資': salary,
                '釋出時間': pulish_time,
            }
            # 字典轉換為json字串
            string = json.dumps(item, ensure_ascii=False)
            # 每寫一個加一個換行
            if self.lock.acquire():
                self.fp.write(string + '\n')
                self.lock.release()


def create_queue():
    # 頁碼
    page_queue = Queue(10)
    # 響應
    data_queue = Queue(10)
    for page in range(1, 11):
        page_queue.put(page)
    return page_queue, data_queue


def main():
    # 建立一把鎖
    lock = Lock()
    # 儲存檔案
    fp = open('jobs.txt', 'w', encoding='utf8')
    # 建立一個頁碼佇列函式
    page_queue, data_queue = create_queue()
    # 用兩個列表儲存所有的執行緒
    crawl_thred_list = []
    parse_thred_list = []
    craw_name_list = ['採集執行緒1', '採集執行緒2', '採集執行緒3']
    parse_name_list = ['解析執行緒1', '解析執行緒2', '解析執行緒3']

    for name in craw_name_list:
        t_crawl = CrawThread(name, page_queue, data_queue)
        # 啟動
        t_crawl.start()
        crawl_thred_list.append(t_crawl)

    for name in parse_name_list:
        t_parse = ParseThread(name, data_queue, fp, lock, page_queue)
        # 啟動
        t_parse.start()
        parse_thred_list.append(t_parse)
    global g_crawl, g_parse
    while 1:
        if page_queue.empty():
            g_crawl = False
            break
    for crawl in crawl_thred_list:
        crawl.join()
    while 1:
        if data_queue.empty():
            g_parse = False
            break
    for parse in parse_thred_list:
        parse.join()
    print('主執行緒--子執行緒全都結束')


if __name__ == '__main__':
    main()