1. 程式人生 > >python 爬蟲proxy,BeautifulSoup+requests+mysql 爬取樣例

python 爬蟲proxy,BeautifulSoup+requests+mysql 爬取樣例

實現思路:

     由於反扒機制,所以需要做代理切換,去爬取,內容通過BeautifulSoup去解析,最後入mysql庫

1.在西刺免費代理網獲取代理ip,並自我檢測是否可用

2.根據獲取的可用代理ip去傳送requests模組的請求,帶上代理

3.內容入庫

注:日誌模組在上一篇隨筆

下面附上程式碼

1.可用代理獲取

# -*- coding: utf-8 -*-
import random
import time
import requests
from bs4 import BeautifulSoup
import log_config
logger 
= log_config.getlogger('ip_pool', 'ip_pool.log') class IPProxyPool: # 初始化,定義一個空陣列ip_list用於儲存ip代理 def __init__(self): # 代理ip獲取網址 self.proxy_url_list = ['http://www.xicidaili.com', 'http://www.xicidaili.com/nn', 'http://www.xicidaili.com/nn/2'] self.ip_list = [] self.headers
= {'Host': 'www.xicidaili.com', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36' } self.user_agent_list = [ "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1
", "Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1090.0 Safari/536.6", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/19.77.34.5 Safari/537.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.9 Safari/536.5", "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.36 Safari/536.5", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3", "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.0 Safari/536.3", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24" ] def get_xici_all_ip(self): ip_lists = [] for proxy_url in self.proxy_url_list: html = requests.get(proxy_url, headers=self.headers) content = html.text soup = BeautifulSoup(content, "lxml") ip_body = soup.find("table", attrs={"id": "ip_list"}) ip_page_lists = ip_body.find_all("tr", attrs={"class": "odd"}) ip_lists = ip_lists + ip_page_lists return ip_lists def get_ip_list(self): ip_lists = self.get_xici_all_ip() ip_test_pool = [] for ip in ip_lists: http_type = ip.find_all("td")[5].get_text() if http_type == 'HTTP': ip_test_account = ip.find_all("td")[1].get_text() ip_test_port = ip.find_all("td")[2].get_text() ip_port_dict = {ip_test_account: ip_test_port} ip_test_pool.append(ip_port_dict) for ipn in ip_test_pool: ip_addr = "http://" for ip, port in ipn.items(): ip_addr = ip_addr + ip + ':' + port # ip代理有效性檢驗 statu = self.check_ip(ip_addr) if statu: # 將有效ip代理儲存至陣列ip_list中 self.ip_list.append(ip_addr.strip()) def check_ip(self, ip): return self.microbell_proxy_ip(ip) def microbell_proxy_ip(self, ip): try: test_url = 'http://www.microbell.com/elitelist.html' proxy = {'http': ip} user_agent = self.random_agent() headers_agent = {'User-Agent': user_agent} response_body = requests.get(test_url, headers=headers_agent, proxies=proxy, timeout=5) if response_body.status_code == 200: # 即使返回了200,也可能不是我們訪問的頁面,而是代理給我們的頁面,所以還需要做判斷 #response_body.encoding('gbk') content = response_body.text soup = BeautifulSoup(content, "lxml") body = soup.find("div", attrs={"class": "index_docmain"}) if body is None: return False if body.get_text() != "": logger.info("ok proxy ip %s" % ip) return True else: return False else: return False except Exception as e: logger.exception(e.message) time.sleep(1) return False def random_agent(self): user_agent = random.choice(self.user_agent_list) return user_agent if __name__ == "__main__": IPProxyPool = IPProxyPool() IPProxyPool.get_ip_list() print IPProxyPool.ip_list # proxies = { # "http": "http://118.190.95.35:9001" # 代理ip # } # # headers = { # 'Host': 'www.4399.com', # 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36' # # } # # http_url = "http://www.4399.com/" # try: # res = requests.get(url=http_url, headers=headers, proxies=proxies, timeout=3) # if res.status_code == 200: # print u"訪問網頁成功" # else: # print "faile" # except Exception as e: # print e

2.解析介面,獲取想要的內容,併入庫

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import ConfigParser
import datetime
import sys
import pymysql
import requests
from bs4 import BeautifulSoup
import log_config
import time
import random
from agent_and_proxy_ip_pool import IPProxyPool

logger = log_config.getlogger('report', 'report.log')

# 是否獲取今天的資料 0 表示獲取今天的資料,1表示獲取全部資料 2表示頁面沒有資料
get_today_data = 1
if len(sys.argv) != 1:
    if sys.argv[1] == 1:
        get_today_data = 1
    else:
        print 'input error,please input 0->today ,1->all data'
        exit()


class research_report:
    def __init__(self):
        conf = ConfigParser.ConfigParser()
        conf.read("mysql.conf")
        self.ip_proxy_pool = IPProxyPool()
        # self.ip_proxy_pool.get_ip_list()
        # self.ip_pool = self.ip_proxy_pool.ip_list
        # logger.info('You can currently use IP %s' % self.ip_pool)
#多個可用的cookies
self.cookies_pool = [ 'c=; ASPSESSIONIDQCQRQQCR=LEOOBOJCBAMFFDHMFBHFJKEE; __guid=188006958.3779224451650617000.1539657585525.2588; ASPSESSIONIDSATRTTDQ=MCDEIPFDLLKBNHPBBEMGBGFC; safedog-flow-item=C07B93F771; UM_distinctid=16680b1e9e411f-0674a4c85ccc2-454c092b-1fa400-16680b1e9e539d; CNZZDATA1752123=cnzz_eid%3D2075545357-1539752826-%26ntime%3D1539752826; Hm_lvt_d554f0f6d738d9e505c72769d450253d=1539757436; robih=vXuWjYMDvV6XmNxOuNmP; MBpermission=0; MBname=sunyue1993; did=67A671BFE; monitor_count=6; Hm_lpvt_d554f0f6d738d9e505c72769d450253d=1539757719' ] self.get_today = get_today_data self.user = conf.get("mysql", "user") self.mysql_password = conf.get("mysql", "password") self.database_name = conf.get("mysql", "database") self.host = conf.get("mysql", "host") self.port = conf.get("mysql", "port") self.site_url = 'http://www.microbell.com/' self.page_url = 'http://www.microbell.com/elitelist_1_0.html' self.headers = {'Host': 'www.microbell.com', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, sdch', 'Accept-Language': 'zh-CN,zh;q=0.8', 'Connection': 'keep-alive' } # 生成隨機agent def get_random_headers(self): # self.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36' self.headers['User-Agent'] = self.ip_proxy_pool.random_agent() self.headers['Cookie'] = random.choice(self.cookies_pool) # 獲取指定陣列中的隨機ip def get_random_proxy(self): proxy_ip = random.choice(self.ip_pool) proxies = {'http': proxy_ip} return proxies # 獲取列表頁面 def get_html_content(self, page_num_url): try: self.get_random_headers() req = requests.get(page_num_url, headers=self.headers, timeout=5) req.encoding = 'gbk' text = req.text soup = BeautifulSoup(text, "lxml") # soup = body.prettify #美化 report_list = soup.find_all("div", attrs={"class": "classbaogao_sousuo_list"}) list_data = [] logger.info("%s owner %s pages" % (page_num_url, len(report_list))) if len(report_list) == 0: return 2 for report_item in report_list: url = self.site_url + report_item.table.tr.find_all("td")[1].a["href"] title = report_item.table.tr.find_all("td")[1].a["title"] item_data = {"url": url, "title": title} list_data.append(item_data) end_flag = self.get_list_page_data(list_data) return end_flag except Exception as e: logger.exception("get list %s page fail error info : %s" % (page_num_url, e)) return 2 # 獲取一頁資料38條的每條的詳情 def get_list_page_data(self, list_data): try: # 一頁資料(38)陣列入庫一次 page_datas = [] now_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.get_random_headers() #proxy_ip = self.get_random_proxy() for item_data in list_data: retry_num = 0 while retry_num < 3: try: # 休眠兩秒,避免被封 t = random.uniform(3, 5) time.sleep(t) req = requests.get(item_data["url"], headers=self.headers, timeout=5) req.encoding = 'gbk' text = req.text soup = BeautifulSoup(text, "lxml") detail_div = soup.find("div", attrs={"class": "leftn2"}) tr_s = detail_div.table.find_all("tr") public_time = tr_s[0].find_all("td")[2].span.get_text() if self.get_today == 0: # 如果是爬取今天的資料,會進入這個判斷,如果當期獲取的檔案的時間早於今天就直接退出迴圈 logger.info("now spider today data") today = datetime.date.today() today_time = int(time.mktime(today.timetuple())) time_array = time.strptime(public_time, "%Y-%m-%d %H:%M:%S") pub_time = int(time.mktime(time_array)) if pub_time < today_time: break abstract_br_replace = soup.find("div", attrs={"class": "p_main"}).p.span str1 = str(abstract_br_replace).replace("<br/>", r"\r\n") abstract_object = BeautifulSoup(str1, "lxml") [s.extract() for s in abstract_object("font")] abstract = abstract_object.get_text() sec_name = tr_s[0].find_all("td")[0].span.get_text() sec_code = tr_s[0].find_all("td")[1].span.get_text() report_type = tr_s[1].find_all("td")[0].span.get_text() doc_type = tr_s[1].find_all("td")[1].span.get_text() author = tr_s[1].find_all("td")[2].span.get_text() provenance = tr_s[2].find_all("td")[0].span.get_text() pages = tr_s[2].find_all("td")[1].span.get_text() rec_rate = tr_s[2].find_all("td")[2].span.get_text() doc_size = tr_s[3].find_all("td")[0].span.get_text() promulgator = tr_s[3].find_all("td")[1].span.get_text() #doc_url_str = soup.find("div", attrs={"class": "anniu_main"}).a["onclick"] doc_url_str = "" doc_url_list = doc_url_str.split(",") doc_url = self.site_url + doc_url_list[2] title = item_data["title"] create_time = now_date update_time = now_date page_data = [title, sec_name, sec_code, public_time, report_type, doc_type, author, provenance, pages, rec_rate, doc_size, doc_url, promulgator, abstract, create_time, update_time] page_datas.append(page_data) break except Exception as e: retry_num += 1 if retry_num == 3: logger.warning("current page is not get %s" % item_data) if len(page_datas) > 0: self.set_data_mysql(page_datas) if self.get_today == 0: if len(page_datas) < 38: return 0 return 1 else: return 2 except Exception as e: logger.error("get detail page fail" % list_data, e) return 2 # 批量插入mysql def set_data_mysql(self, page_datas): # 建立連線 conn = pymysql.connect(host=self.host, port=int(self.port), user=self.user, passwd=self.mysql_password, db=self.database_name) try: # 建立遊標 cursor = conn.cursor() sql = "INSERT INTO report(title,sec_name,sec_code,public_time,report_type,doc_type,author," \ "provenance,pages,rec_rate,doc_size,doc_url,promulgator,abstract,create_time,update_time) " \ "VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)" effect_row = cursor.executemany(sql, page_datas) # 提交sql,不提交不會進入mysql conn.commit() logger.info("already into dabatabase %s" % effect_row) finally: conn.close() # 登入,獲取cookie,暫時沒用 # def login_in(self): # data = { # 'namelogin': self.user_name, # 'pwdlogin': self.password # } # req = requests.post(self.login_url, headers=self.headers, data=data) # req.encoding = req.apparent_encoding # cookies = req.cookies.get_dict() # print cookies # http://www.microbell.com/elitelist_1_0.html 初始頁面,後面的頁面只有url中的"1"這個值會變動 def process(self): # 分析頁面,總共不超過360頁 if get_today_data == 0: for i in range(1, 20): base_url = "http://www.microbell.com/elitelist_%s_0.html" % i logger.info("當前獲取頁面url=%s" % base_url) end_flag = self.get_html_content(base_url) if end_flag == 0: logger.info("The page %s is already the last page" % base_url) break else: for i in reversed(range(1, 107)): base_url = "http://www.microbell.com/elitelist_%s_0.html" % i logger.info("當前獲取頁面url=%s" % base_url) self.get_html_content(base_url) if __name__ == "__main__": research_class = research_report() research_class.process() else: research_class = research_report() research_class.process()