從西刺代理爬取代理ip,並驗證是否可用
阿新 • • 發佈:2019-02-01
最近又重新拾起了久違的爬蟲,寫了一個代理ip的爬取,驗證和儲存器。
1.爬取網站是西刺代理,使用了requests+beautifulsoup庫
2.驗證的網站使用了京東和淘寶的首頁,用了urllib+beautifulsoup庫
3.將爬取後的程式碼存入本地的資料庫中,這裡使用的是sql server 2008,用的是pyodbc庫
4.驗證的時候開了20個執行緒,用了python裡的threading庫
5.定期從庫中拿出代理ip,將失效的ip刪除
爬取程式碼:
# -*- coding: utf-8 -*- import time import pyodbc import requests import urllib import threading import socket import sys import csv from bs4 import BeautifulSoup reload(sys) sys.setdefaultencoding("utf-8") target_url = [] aim_ip = [] for i in range(1, 2): url = 'http://www.xicidaili.com/nn/%d' %i target_url.append(url) all_message = [] class ipGet(threading.Thread): def __init__(self, target): threading.Thread.__init__(self) self.target = target def Get_ip(self): headers = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11'} html = requests.get(self.target, headers=headers) soup = BeautifulSoup(html.text) trs = soup.find('table', id='ip_list').find_all('tr') for tr in trs[1:]: tds = tr.find_all('td') ip = tds[1].text.strip() opening = tds[2].text.strip() message = [ip, opening] all_message.append(message) # print ip, opening def run(self): self.Get_ip() class ipCheck(threading.Thread): def __init__(self, ipList): threading.Thread.__init__(self) self.ipList = ipList self.timeout = 6 self.test_url = 'http://www.jd.com/?cu=true&utm_source=click.linktech.cn&utm_medium=tuiguang&utm_campaign=t_4_A100220955&utm_term=7e7c13a102664ab3a6886ccefa66d930&abt=3' self.another_url = 'https://www.taobao.com/' def Check_ip(self): socket.setdefaulttimeout(3) for ip in self.ipList: try: proxy_host = "http://"+ip[0]+":"+ip[1] proxy_temp = {"http":proxy_host} t_start = time.time() res = urllib.urlopen(self.test_url, proxies=proxy_temp).read() res2 = urllib.urlopen(self.another_url, proxies=proxy_temp).read() t_use = time.time() - t_start soup = BeautifulSoup(res) soup2 = BeautifulSoup(res2) ans = soup.find('link', rel='dns-prefetch') ans2 = soup2.find('link', rel='dns-prefetch') if ans != None and ans2 != None: aim_ip.append((ip[0], ip[1], t_use)) else: continue except Exception, e: print e def run(self): self.Check_ip() class save_csv(): def __init__(self, SaveList): self.driver = '{SQL Server}' self.server = '(local)' self.database = 'ip_save' self.savelist = SaveList def Save_ip(self): base = pyodbc.connect(DRIVER = self.driver, SERVER = self.server, DATABASE = self.database) source = base.cursor() counts = 0 for each in self.savelist: aim = source.execute("select * from ip where ips='%s'"%each[0]) if aim.fetchone() == None: source.execute("Insert into ip values('%s','%s','%s')"%(each[0], each[1], each[2])) else: print "The ip: '%s' is exist!"%each[0] counts += 1 base.commit() source.close() base.close() return counts if __name__ == '__main__': GetThreading = [] CheckThreading = [] for i in range(len(target_url)): t = ipGet(target_url[i]) GetThreading.append(t) for i in range(len(GetThreading)): GetThreading[i].start() print GetThreading[i].is_alive() for i in range(len(GetThreading)): GetThreading[i].join() print '@' * 3 + ' ' * 2 + "總共抓取了%s個代理" % len(all_message) + ' ' * 2 + '@' * 3 for i in range(20): t = ipCheck(all_message[((len(all_message)+19)/20)*i:((len(all_message)+19)/20)*(i+1)]) CheckThreading.append(t) for i in range(len(CheckThreading)): CheckThreading[i].start() print CheckThreading[i].is_alive() for i in range(len(CheckThreading)): CheckThreading[i].join() print '@' * 3 + ' ' * 2 + "總共有%s個代理通過校驗" % len(aim_ip) + ' ' * 2 + '@' * 3 t = save_csv(aim_ip) counts = t.Save_ip() print '@' * 3 + ' ' * 2 + "總共新增%s個代理" % (len(aim_ip)-counts) + ' ' * 2 + '@' * 3
定期驗證:
# -*- coding: utf-8 -*- import pyodbc import threading import socket import urllib import time from bs4 import BeautifulSoup class Get_ip_sql(): def __init__(self): self.driver = '{SQL Server}' self.server = '(local)' self.database = 'ip_save' def Get(self): base = pyodbc.connect(DRIVER=self.driver, SERVER=self.server, DATABASE=self.database) source = base.cursor() CheckList = source.execute("Select * from ip") CheckList = list(CheckList) counts = source.execute("Select count(*) from ip") row = counts.fetchone() return CheckList, row[0] class Check_ip_intime(threading.Thread): def __init__(self, CheckList): threading.Thread.__init__(self) self.checklist = CheckList self.driver = '{SQL Server}' self.server = '(local)' self.database = 'ip_save' self.test_url = 'http://www.jd.com/?cu=true&utm_source=click.linktech.cn&utm_medium=tuiguang&utm_campaign=t_4_A100220955&utm_term=7e7c13a102664ab3a6886ccefa66d930&abt=3' self.another_url = 'https://www.taobao.com/' def Work(self): base = pyodbc.connect(DRIVER=self.driver, SERVER=self.server, DATABASE=self.database) source = base.cursor() socket.setdefaulttimeout(3) for each in self.checklist: try: proxy_host = "http://"+each[0]+":"+bytes(each[1]) proxy_temp = {'http':proxy_host} t_start = time.time() res = urllib.urlopen(self.test_url, proxies=proxy_temp).read() res2 = urllib.urlopen(self.another_url, proxies=proxy_temp).read() t_use = time.time() - t_start t_use = bytes(t_use) soup = BeautifulSoup(res) soup2 = BeautifulSoup(res2) ans = soup.find('link', rel='dns-prefetch') ans2 = soup2.find('link', rel='dns-prefetch') if ans == None or ans2 == None: source.execute("Delete from ip where ips = '%s'"%(each[0])) else: source.execute("Update ip set time_used = '%s' where ips = '%s'"%(t_use, each[0])) print each[0] except Exception, e: source.execute("Delete from ip where ips = '%s'"%(each[0])) print e base.commit() def run(self): self.Work() class Count_ip(): def __init__(self): self.driver = '{SQL Server}' self.server = '(local)' self.database = 'ip_save' def Compute(self): base = pyodbc.connect(DRIVER=self.driver, SERVER=self.server, DATABASE=self.database) source = base.cursor() col = source.execute("Select count(*) from ip") ans = col.fetchone() return ans[0] if __name__ == '__main__': t = Get_ip_sql() Check, counts= t.Get() CheckThreading = [] points = 0 for i in range(5): t = Check_ip_intime(Check[((counts + 4) / 5) * i:((counts + 4) / 5) * (i + 1)]) CheckThreading.append(t) for i in range(len(CheckThreading)): CheckThreading[i].start() print CheckThreading[i].is_alive() for i in range(len(CheckThreading)): CheckThreading[i].join() c = Count_ip() ans = c.Compute() print '@' * 3 + ' ' * 2 + "總共刪除了%s個失效代理" %(counts - ans) + ' ' * 2 + '@' * 3 print '@' * 3 + ' ' * 2 + "剩餘%s個代理" % ans + ' ' * 2 + '@' * 3