1. 程式人生 > >python:多執行緒抓取西刺和快站 高匿代理IP

python:多執行緒抓取西刺和快站 高匿代理IP

  一開始是打算去抓取一些資料,但是總是訪問次數多了之後被封IP,所以做了一個專門做了個工具用來抓取在西刺和快站的高匿IP。

  執行環境的話是在python3.5下執行的,需要requests庫

  在製作的過程中也參考的以下網上其他人的做法,但是發現很大一部分都不是多執行緒去抓取有點浪費時間了,又或者或網上已經有人做好了輪子了,但是現在的技術還有點看不懂,所以就做了這隻在一個py檔案上執行的代理池。

  對於舊IP的處理,我這下面的程式碼是沒有讓他執行的,如果要執行可以開啟,不過必須要在同一個資料夾裡建立一個名為“old_ip.txt”的文件,本來可以做一個自動判定,沒有就可以生成的,到最後又沒弄了。

  有一點是要重點注意一下的就是:不建議一下子抓取超過10頁,因為我沒有設定request訪問限定,西刺或者快站都會因為你訪問速度太快而判定你為爬蟲封24小時的IP......我已經嘗試過。 

  以下是程式碼:

# -*- coding: UTF-8 -*-
import threading, requests, datetime
from bs4 import BeautifulSoup
import random
import queue

# ------------------------------IP多執行緒設定--------------------------
class
Mythread(threading.Thread): # def __init__(self, ip, path, url, type="new_ip"): super(Mythread, self).__init__() self.ip = ip self.path = path self.url = url self.type = type def run(self): if self.type == "new_ip": if semaphoer.acquire(): target1
= check_ip(self.ip, self.url) if target1 == True: write(self.ip, self.path) all_IP.add(self.ip) print("這個ip可以使用", self.ip) semaphoer.release() else: if semaphoer.acquire(): target2 = check_ip(self.ip, self.url) if target2 == True: all_IP.add(self.ip) print("這個舊IP可使用", self.ip) semaphoer.release() # ------------------------------------時間計算----------------------------------------- def cost(start, end): seconds = (end - start).seconds m, s = divmod(seconds, 60) h, m = divmod(m, 60) cost_time = ("%s:%s:%s" % (h, m, s)) return cost_time # -----------------隨機選擇請求頭引數-------------------- def getheaders(): user_agent_list = [ "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1" "Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1090.0 Safari/536.6", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/19.77.34.5 Safari/537.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.9 Safari/536.5", "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.36 Safari/536.5", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3", "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.0 Safari/536.3", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24" ] UserAgent = random.choice(user_agent_list) headers = {"User-Agent": UserAgent} return headers # --------------------併發IP驗證------------------------------- def check_ip(ip, url): header = getheaders() proxies = {"http": "http://" + ip, "https": "http://" + ip} print("開始測試這個ip", ip) # 實時反映測試IP,不希望被IP刷屏 可關閉。 try: response = requests.get(url=url, proxies=proxies, headers=header, timeout=5).status_code # 定為5秒內有響應即為可用ip if response == 200: return True else: return False except: return False # ---------------------清空函式----------------------------- def clearing_txt(path): with open(path, 'w', encoding="utf-8") as f: f.truncate() # --------------------讀取函式(並返回一個列表)------------------- def read_txt(path): txt = [] with open(path, "r", encoding="utf-8") as h: for line in h.readlines(): txt.append(line.strip()) return txt # --------------------寫入函式---------------------------- def write(ip, path): with open(path, "a", encoding="utf8") as f: f.writelines(ip) f.write("\n") # --------------------------阻塞主執行緒------------------------- def join(list): for i in list: i.start() for i in list: i.join() # -------------------------網站爬蟲抓取IP--------------------------------- def call_net(num, pagenum): scrapy_url = {1: 'https://www.kuaidaili.com/free/inha/', # 快代理 2: 'http://www.xicidaili.com/nn/' # 西刺 } get_url = scrapy_url[num] + str(pagenum) # URL組合 header = getheaders() # 請求頭 html = requests.get(url=get_url, headers=header, timeout=6).text # 訪問並提取網頁原始碼 soup = BeautifulSoup(html, 'lxml') # 格式化 if num == 2: all = soup.find_all("tr", class_="odd") # 西刺代理的網站原始碼IP有兩種不同的tr標籤,所以要兩種方法處理 all2 = soup.find_all("tr", class_="") for i in all: t = i.find_all('td') ip_1 = t[1].text + ':' + t[2].text q.put(ip_1) # 放入IP佇列 for h in all2: x = h.find_all('td') if x == []: continue ip_2 = x[1].text + ':' + x[2].text q.put(ip_2) # 放入IP佇列 else: all_ip = soup.find_all(attrs={"data-title": "IP"}) # 快代理 IP 和 port 都是不同的標籤引數,所以要用兩種方法處理 all_port = soup.find_all(attrs={"data-title": "PORT"}) all = len(all_ip) for i in range(all): ip_3 = all_ip[i].text + ":" + all_port[i].text q.put(ip_3) # 放入IP佇列 # ---------------------------獲取IP主程式------------------------------ def get_ip(url, path, path_old, page): clearing_txt(path) # 清空檔案內容 threads = [] # 爬蟲執行緒列表 threads_ip = [] # IP執行緒測試列表 threads_old_ip = [] # 舊IP執行緒測試列表 start_time = datetime.datetime.now() # 記錄開始時間 for num in range(2): for page_num in range(page): net_threads = threading.Thread(target=call_net, args=(num + 1, page_num + 1)) # 建立爬蟲執行緒進行訪問 threads.append(net_threads) print("開始抓取西刺、快站的高匿代理") join(threads) # print("測試舊儲存IP") # for i in old_IP_list: # threads_old_ip.append(Mythread(i,path,url,type="old")) #建立舊IP執行緒進行訪問 # join(threads_old_ip) # print("一共%s箇舊IP可使用" %len(all_IP)) while not q.empty(): # 提取q佇列內的IP,並建立IP測試執行緒 i = q.get() threads_ip.append(Mythread(i, path, url, )) # 建立新IP執行緒進行訪問 join(threads_ip) print("成功爬取") end_time = datetime.datetime.now() # 記錄結束時間 cost_time = cost(start_time, end_time) clearing_txt(path_old) for all_ip in all_IP: # 所有IP寫入 write(all_ip, path_old) new_ip = read_txt(path) old_ip = read_txt(path_old) print("耗時:%s 一共抓取:%s個新IP 以儲存:%s個IP" % (cost_time, len(new_ip), len(old_ip)))

'''
1、抓取西刺代理、快代理的ip
2、提取之前儲存的ip,重新驗證ip池內ip的可用性
3、使用ip去訪問指定網站 即:url,設定其響應時間為5秒,並且要在規定時間內返回200,即為可用IP 。
4、old_path為IP地址堆疊處,儲存舊IP。
4、最後都儲存到指定資料夾
'''


# --------------------------主要引數配置點------------------------------------ if __name__ == '__main__': semaphoer = threading.Semaphore(15) # 執行緒池數量大小,預設為15 q = queue.Queue() # 建立佇列 page = 2 # 爬取西刺和快代理的頁數 不能設定過大 path = "ip.txt" # 設定新爬取IP儲存的檔名 path_old = "ip_old.txt" # 設定舊IP儲存點 url = "https://www.baidu.com/" # 設定驗證IP可用性的url引數 # old_IP_list = read_txt(path_old) #舊IP提取 all_IP = set() # 全IP集合 get_ip(url, path, path_old, page)