1. 程式人生 > >使用redis+flask維護動態代理池

使用redis+flask維護動態代理池

tex 行數據 getter data- session ems loop refresh 批量

在進行網絡爬蟲時,會經常有封ip的現象。可以使用代理池來進行代理ip的處理。

代理池的要求:多站抓取,異步檢測。定時篩選,持續更新。提供接口,易於提取。

代理池架構:獲取器,過濾器,代理隊列,定時檢測。

使用https://github.com/Germey/ProxyPool/tree/master/proxypool代碼進行分析。

run.py裏面的代碼

from proxypool.api import app
from proxypool.schedule import Schedule

def main():

    s = Schedule()
    s.run()
    app.run()




if __name__ == __main__: main()

首先運行了一個調度器,接著運行了一個接口。

調度器schedule.py代碼

import time
from multiprocessing import Process
import asyncio
import aiohttp
try:
    from aiohttp.errors import ProxyConnectionError,ServerDisconnectedError,ClientResponseError,ClientConnectorError
except:
    from aiohttp import
ClientProxyConnectionError as ProxyConnectionError,ServerDisconnectedError,ClientResponseError,ClientConnectorError from proxypool.db import RedisClient from proxypool.error import ResourceDepletionError from proxypool.getter import FreeProxyGetter from proxypool.setting import * from asyncio import
TimeoutError class ValidityTester(object): test_api = TEST_API def __init__(self): self._raw_proxies = None self._usable_proxies = [] def set_raw_proxies(self, proxies): self._raw_proxies = proxies self._conn = RedisClient() async def test_single_proxy(self, proxy): """ text one proxy, if valid, put them to usable_proxies. """ try: async with aiohttp.ClientSession() as session: try: if isinstance(proxy, bytes): proxy = proxy.decode(utf-8) real_proxy = http:// + proxy print(Testing, proxy) async with session.get(self.test_api, proxy=real_proxy, timeout=get_proxy_timeout) as response: if response.status == 200: self._conn.put(proxy) print(Valid proxy, proxy) except (ProxyConnectionError, TimeoutError, ValueError): print(Invalid proxy, proxy) except (ServerDisconnectedError, ClientResponseError,ClientConnectorError) as s: print(s) pass def test(self): """ aio test all proxies. """ print(ValidityTester is working) try: loop = asyncio.get_event_loop() tasks = [self.test_single_proxy(proxy) for proxy in self._raw_proxies] loop.run_until_complete(asyncio.wait(tasks)) except ValueError: print(Async Error) class PoolAdder(object): """ add proxy to pool """ def __init__(self, threshold): self._threshold = threshold self._conn = RedisClient() self._tester = ValidityTester() self._crawler = FreeProxyGetter() def is_over_threshold(self): """ judge if count is overflow. """ if self._conn.queue_len >= self._threshold: return True else: return False def add_to_queue(self): print(PoolAdder is working) proxy_count = 0 while not self.is_over_threshold(): for callback_label in range(self._crawler.__CrawlFuncCount__): callback = self._crawler.__CrawlFunc__[callback_label] raw_proxies = self._crawler.get_raw_proxies(callback) # test crawled proxies self._tester.set_raw_proxies(raw_proxies) self._tester.test() proxy_count += len(raw_proxies) if self.is_over_threshold(): print(IP is enough, waiting to be used) break if proxy_count == 0: raise ResourceDepletionError class Schedule(object): @staticmethod def valid_proxy(cycle=VALID_CHECK_CYCLE): """ Get half of proxies which in redis """ conn = RedisClient() tester = ValidityTester() while True: print(Refreshing ip) count = int(0.5 * conn.queue_len) if count == 0: print(Waiting for adding) time.sleep(cycle) continue raw_proxies = conn.get(count) tester.set_raw_proxies(raw_proxies) tester.test() time.sleep(cycle) @staticmethod def check_pool(lower_threshold=POOL_LOWER_THRESHOLD, upper_threshold=POOL_UPPER_THRESHOLD, cycle=POOL_LEN_CHECK_CYCLE): """ If the number of proxies less than lower_threshold, add proxy """ conn = RedisClient() adder = PoolAdder(upper_threshold) while True: if conn.queue_len < lower_threshold: adder.add_to_queue() time.sleep(cycle) def run(self): print(Ip processing running) valid_process = Process(target=Schedule.valid_proxy) check_process = Process(target=Schedule.check_pool) valid_process.start() check_process.start()

調度器裏面有個run方法,裏面運行了兩個進程。一個進程valid_process是從網上獲取代理放入數據庫;另外一個進程check_process是定時的從數據庫拿出代理進行檢測。

valid_proxy是定時檢測器,裏面傳入一個時間的參數cycle=VALID_CHECK_CYCLE,定義定時檢測的時間。方法裏首先定義一個RedisClient()進行數據庫的連接,該方法定義在db.py中

import redis
from proxypool.error import PoolEmptyError
from proxypool.setting import HOST, PORT, PASSWORD


class RedisClient(object):
    def __init__(self, host=HOST, port=PORT):
        if PASSWORD:
            self._db = redis.Redis(host=host, port=port, password=PASSWORD)
        else:
            self._db = redis.Redis(host=host, port=port)

    def get(self, count=1):
        """
        get proxies from redis
        """
#從左側批量獲取的方法 proxies = self._db.lrange("proxies", 0, count - 1)
self._db.ltrim(
"proxies", count, -1) return proxies def put(self, proxy): """ add proxy to right top """ self._db.rpush("proxies", proxy) def pop(self): """ get proxy from right. """ try: return self._db.rpop("proxies").decode(utf-8) except: raise PoolEmptyError @property def queue_len(self): """ get length from queue. """ return self._db.llen("proxies") def flush(self): """ flush db """ self._db.flushall() if __name__ == __main__: conn = RedisClient() print(conn.pop())

接著還聲明了ValidityTester(),用來檢測代理是否可用,其中的test_single_proxy()方法是實現異步檢測的關鍵。

check_pool()方法裏面需要傳入三個參數:兩個代理池的上下界限,一個時間。裏面有個PoolAdder的add_to_queue()方法。
add_to_queue()方法中使用了一個從網站抓取ip的類FreeProxyGetter(),在getter.py裏面
from .utils import get_page
from pyquery import PyQuery as pq
import re


class ProxyMetaclass(type):
    """
        元類,在FreeProxyGetter類中加入
        __CrawlFunc__和__CrawlFuncCount__
        兩個參數,分別表示爬蟲函數,和爬蟲函數的數量。
    """

    def __new__(cls, name, bases, attrs):
        count = 0
        attrs[__CrawlFunc__] = []
        for k, v in attrs.items():
            if crawl_ in k:
                attrs[__CrawlFunc__].append(k)
                count += 1
        attrs[__CrawlFuncCount__] = count
        return type.__new__(cls, name, bases, attrs)


class FreeProxyGetter(object, metaclass=ProxyMetaclass):
    def get_raw_proxies(self, callback):
        proxies = []
        print(Callback, callback)
        for proxy in eval("self.{}()".format(callback)):
            print(Getting, proxy, from, callback)
            proxies.append(proxy)
        return proxies

    def crawl_ip181(self):
        start_url = http://www.ip181.com/
        html = get_page(start_url)
        ip_adress = re.compile(<tr.*?>\s*<td>(.*?)</td>\s*<td>(.*?)</td>)
        # \s* 匹配空格,起到換行作用
        re_ip_adress = ip_adress.findall(html)
        for adress, port in re_ip_adress:
            result = adress + : + port
            yield result.replace( , ‘‘)

    def crawl_kuaidaili(self):
        for page in range(1, 4):
            # 國內高匿代理
            start_url = https://www.kuaidaili.com/free/inha/{}/.format(page)
            html = get_page(start_url)
            ip_adress = re.compile(
                <td data-title="IP">(.*)</td>\s*<td data-title="PORT">(\w+)</td>
            )
            re_ip_adress = ip_adress.findall(html)
            for adress, port in re_ip_adress:
                result = adress + : + port
                yield result.replace( , ‘‘)

    def crawl_xicidaili(self):
        for page in range(1, 4):
            start_url = http://www.xicidaili.com/wt/{}.format(page)
            html = get_page(start_url)
            ip_adress = re.compile(
                <td class="country"><img src="http://fs.xicidaili.com/images/flag/cn.png" alt="Cn" /></td>\s*<td>(.*?)</td>\s*<td>(.*?)</td>
            )
            # \s* 匹配空格,起到換行作用
            re_ip_adress = ip_adress.findall(html)
            for adress, port in re_ip_adress:
                result = adress + : + port
                yield result.replace( , ‘‘)

    def crawl_daili66(self, page_count=4):
        start_url = http://www.66ip.cn/{}.html
        urls = [start_url.format(page) for page in range(1, page_count + 1)]
        for url in urls:
            print(Crawling, url)
            html = get_page(url)
            if html:
                doc = pq(html)
                trs = doc(.containerbox table tr:gt(0)).items()
                for tr in trs:
                    ip = tr.find(td:nth-child(1)).text()
                    port = tr.find(td:nth-child(2)).text()
                    yield :.join([ip, port])

    def crawl_data5u(self):
        for i in [gngn, gnpt]:
            start_url = http://www.data5u.com/free/{}/index.shtml.format(i)
            html = get_page(start_url)
            ip_adress = re.compile(
                 <ul class="l2">\s*<span><li>(.*?)</li></span>\s*<span style="width: 100px;"><li class=".*">(.*?)</li></span>
            )
            # \s * 匹配空格,起到換行作用
            re_ip_adress = ip_adress.findall(html)
            for adress, port in re_ip_adress:
                result = adress + : + port
                yield result.replace( , ‘‘)

    def crawl_kxdaili(self):
        for i in range(1, 4):
            start_url = http://www.kxdaili.com/ipList/{}.html#ip.format(i)
            html = get_page(start_url)
            ip_adress = re.compile(<tr.*?>\s*<td>(.*?)</td>\s*<td>(.*?)</td>)
            # \s* 匹配空格,起到換行作用
            re_ip_adress = ip_adress.findall(html)
            for adress, port in re_ip_adress:
                result = adress + : + port
                yield result.replace( , ‘‘)

    def crawl_premproxy(self):
        for i in [China-01, China-02, China-03, China-04, Taiwan-01]:
            start_url = https://premproxy.com/proxy-by-country/{}.htm.format(
                i)
            html = get_page(start_url)
            if html:
                ip_adress = re.compile(<td data-label="IP:port ">(.*?)</td>)
                re_ip_adress = ip_adress.findall(html)
                for adress_port in re_ip_adress:
                    yield adress_port.replace( , ‘‘)

    def crawl_xroxy(self):
        for i in [CN, TW]:
            start_url = http://www.xroxy.com/proxylist.php?country={}.format(
                i)
            html = get_page(start_url)
            if html:
                ip_adress1 = re.compile(
                    "title=‘View this Proxy details‘>\s*(.*).*")
                re_ip_adress1 = ip_adress1.findall(html)
                ip_adress2 = re.compile(
                    "title=‘Select proxies with port number .*‘>(.*)</a>")
                re_ip_adress2 = ip_adress2.findall(html)
                for adress, port in zip(re_ip_adress1, re_ip_adress2):
                    adress_port = adress + : + port
                    yield adress_port.replace( , ‘‘)
具體使用方法可以看GitHub。。。。。。

使用redis+flask維護動態代理池