1. 程式人生 > >搭建簡單JAVA分散式爬蟲系統

搭建簡單JAVA分散式爬蟲系統

先貼一張架構圖 

整體架構分三個部分:

排程器        :分配任務

爬蟲            :爬取資料並儲存

監控系統     :檢視爬蟲狀態(主要作用是某個節點down掉了可以今早發現,雖然不影響整體穩定性,但是影響爬蟲效率)

爬蟲部分

爬蟲系統是一個獨立執行的程序,我們把我們的爬蟲系統打包成 jar 包,然後分發到不同的節點上執行,這樣並行爬取資料可以提高爬蟲的效率。(爬蟲原始碼分為:ip池->下載頁面->解析頁面->儲存  四部分)

IP池

加入隨機 IP 代理主要是為了反反爬蟲,因此如果有一個 IP 代理庫,並且可以在構建 http 客戶端時隨機地使用不同的代理,那麼對我們進行反反爬蟲會有很大的幫助。在系統中使用 IP 代理庫,需要先在文字檔案中新增可用的代理地址資訊:

    # IPProxyRepository.txt 
    58.60.255.104:8118 
    219.135.164.245:3128 
    27.44.171.27:9999 
    219.135.164.245:3128 
    58.60.255.104:8118 
    58.252.6.165:9000 
    ...... 

需要注意的是,上面的代理 IP 是我在西刺代理上拿到的一些代理 IP,不一定可用,建議是自己花錢購買一批代理 IP,這樣可以節省很多時間和精力去尋找代理 IP。然後在構建 http 客戶端的工具類中,當第一次使用工具類時,會把這些代理 IP 載入進記憶體中,載入到 Java 的一個 HashMap:

    // IP地址代理庫Map 
    private static Map<String, Integer> IPProxyRepository = new HashMap<>(); 
    private static String[] keysArray = null;   // keysArray是為了方便生成隨機的代理物件 
     
    /** 
         * 初次使用時使用靜態程式碼塊將IP代理庫載入進set中 
         */ 
    static { 
        InputStream in = HttpUtil.class.getClassLoader().getResourceAsStream("IPProxyRepository.txt");  // 載入包含代理IP的文字 
        // 構建緩衝流物件 
        InputStreamReader isr = new InputStreamReader(in); 
        BufferedReader bfr = new BufferedReader(isr); 
        String line = null; 
        try { 
            // 迴圈讀每一行,新增進map中 
            while ((line = bfr.readLine()) != null) { 
                String[] split = line.split(":");   // 以:作為分隔符,即文字中的資料格式應為192.168.1.1:4893 
                String host = split[0]; 
                int port = Integer.valueOf(split[1]); 
                IPProxyRepository.put(host, port); 
            } 
            Set<String> keys = IPProxyRepository.keySet(); 
            keysArray = keys.toArray(new String[keys.size()]);  // keysArray是為了方便生成隨機的代理物件 
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
     
    } 

之後,在每次構建 http 客戶端時,都會先到 map 中看是否有代理 IP,有則使用,沒有則不使用代理:

    CloseableHttpClient httpClient = null; 
    HttpHost proxy = null; 
    if (IPProxyRepository.size() > 0) {  // 如果ip代理地址庫不為空,則設定代理 
        proxy = getRandomProxy(); 
        httpClient = HttpClients.custom().setProxy(proxy).build();  // 建立httpclient物件 
    } else { 
        httpClient = HttpClients.custom().build();  // 建立httpclient物件 
    } 
    HttpGet request = new HttpGet(url); // 構建htttp get請求 
    ...... 

隨機代理物件則通過下面的方法生成:

    /** 
         * 隨機返回一個代理物件 
         * 
         * @return 
         */ 
    public static HttpHost getRandomProxy() { 
        // 隨機獲取host:port,並構建代理物件 
        Random random = new Random(); 
        String host = keysArray[random.nextInt(keysArray.length)]; 
        int port = IPProxyRepository.get(host); 
        HttpHost proxy = new HttpHost(host, port);  // 設定http代理 
        return proxy; 
    } 

這樣,通過上面的設計,基本就實現了隨機 IP 代理器的功能,當然,其中還有很多可以完善的地方。比如,當使用這個 IP 代理而請求失敗時,是否可以把這一情況記錄下來;當超過一定次數時,再將其從代理庫中刪除,同時生成日誌供開發人員或運維人員參考,這是完全可以實現的,不過我就不做這一步功能了。

下載

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;

import java.io.IOException;

/**
 * Title:           Httpclient
 * Description:     採用httpclient獲取htmlpage,方便使用代理
 * Company:         AceGear
 * Author:          henrywang
 * Date:            2018/6/23
 * JDK:             8
 * Encoding:        UTF-8
 */
public class Httpclient {
    private static RequestConfig requestConfig = RequestConfig.custom().setProxy(new HttpHost("49.79.156.117", 8000)).setSocketTimeout(15000).setConnectTimeout(15000)
            .setConnectionRequestTimeout(15000).build();
    static CloseableHttpClient closeableHttpClient = null;
    static CloseableHttpResponse closeableHttpResponse = null;
    static HttpEntity httpEntity = null;

    public Document getDocument(String httpUrl) {
        closeableHttpClient = HttpClients.createDefault();
        HttpGet httpGet = new HttpGet(httpUrl);
        //HttpHost proxy = new HttpHost("39.137.69.10",80);
        httpGet.setHeader("User-Agent", "Mozilla/5.0(Windows NT 6.1; rv:6.0.2) Gecko/20100101 Firefox/6.0.2");
        httpGet.setConfig(requestConfig);
        String responseContent = "";
        try {
            closeableHttpResponse = closeableHttpClient.execute(httpGet);
            httpEntity = closeableHttpResponse.getEntity();
            responseContent = EntityUtils.toString(httpEntity, "UTF-8");
        } catch (IOException e) {
            e.printStackTrace();
        }
        Document document = Jsoup.parse(responseContent);
        return document;


    }
}

解析

網頁解析器就是把下載的網頁中我們感興趣的資料解析出來,並儲存到某個物件中,供資料儲存器進一步處理以儲存到不同的持久化倉庫中,其基於下面的介面進行開發:

    /** 
     * 網頁資料解析 
     */ 
    public interface IParser { 
        public void parser(Page page); 
    } 

網頁解析器在整個系統的開發中也算是比較重頭戲的一個元件,功能不復雜,主要是程式碼比較多,針對不同的商城不同的商品,對應的解析器可能就不一樣了。

因此需要針對特別的商城的商品進行開發,因為很顯然,京東用的網頁模板跟蘇寧易購的肯定不一樣,天貓用的跟京東用的也肯定不一樣。

所以這個完全是看自己的需要來進行開發了,只是說,在解析器開發的過程當中會發現有部分重複程式碼,這時就可以把這些程式碼抽象出來開發一個工具類了。

目前在系統中爬取的是京東和蘇寧易購的手機商品資料,因此就寫了這兩個實現類:

    /** 
     * 解析京東商品的實現類 
     */ 
    public class JDHtmlParserImpl implements IParser { 
        ...... 
    } 
     
    /** 
     * 蘇寧易購網頁解析 
     */ 
    public class SNHtmlParserImpl implements IParser { 
        ...... 
    } 

儲存

資料儲存器主要是將網頁解析器解析出來的資料物件儲存到不同的表格,而對於本次爬取的手機商品,資料物件是下面一個 Page 物件:

    /** 
     * 網頁物件,主要包含網頁內容和商品資料 
     */ 
    public class Page { 
        private String content;              // 網頁內容 
     
        private String id;                    // 商品Id 
        private String source;               // 商品來源 
        private String brand;                // 商品品牌 
        private String title;                // 商品標題 
        private float price;                // 商品價格 
        private int commentCount;        // 商品評論數 
        private String url;                  // 商品地址 
        private String imgUrl;             // 商品圖片地址 
        private String params;              // 商品規格引數 
     
        private List<String> urls = new ArrayList<>();  // 解析列表頁面時用來儲存解析的商品url的容器 
    } 

對應的,在 MySQL 中,表資料結構如下:

    -- ---------------------------- 
    -- Table structure for phone 
    -- ---------------------------- 
    DROP TABLE IF EXISTS `phone`; 
    CREATE TABLE `phone` ( 
      `id` varchar(30) CHARACTER SET armscii8 NOT NULL COMMENT '商品id', 
      `source` varchar(30) NOT NULL COMMENT '商品來源,如jd suning gome等', 
      `brand` varchar(30) DEFAULT NULL COMMENT '手機品牌', 
      `title` varchar(255) DEFAULT NULL COMMENT '商品頁面的手機標題', 
      `price` float(10,2) DEFAULT NULL COMMENT '手機價格', 
      `comment_count` varchar(30) DEFAULT NULL COMMENT '手機評論', 
      `url` varchar(500) DEFAULT NULL COMMENT '手機詳細資訊地址', 
      `img_url` varchar(500) DEFAULT NULL COMMENT '圖片地址', 
      `params` text COMMENT '手機引數,json格式儲存', 
      PRIMARY KEY (`id`,`source`) 
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8; 

而在 HBase 中的表結構則為如下:

    ## cf1 儲存 id source price comment brand url 
    ## cf2 儲存 title params imgUrl 
    create 'phone', 'cf1', 'cf2' 
     
    ## 在HBase shell中檢視建立的表 
    hbase(main):135:0> desc 'phone' 
    Table phone is ENABLED                                                                                                 
    phone                                                                                                                  
    COLUMN FAMILIES DESCRIPTION                                                                                            
    {NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK 
    _ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE =>  
    '65536', REPLICATION_SCOPE => '0'}                                                                                     
    {NAME => 'cf2', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK 
    _ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE =>  
    '65536', REPLICATION_SCOPE => '0'}                                                                                     
    2 row(s) in 0.0350 seconds 

即在 HBase 中建立了兩個列族,分別為 cf1、cf2,其中 cf1 用來儲存 id source price comment brand url 欄位資訊;cf2 用來儲存 title params imgUrl 欄位資訊。

不同的資料儲存用的是不同的實現類,但是其都是基於下面同一個介面開發的:

    /** 
     * 商品資料的儲存 
     */ 
    public interface IStore { 
        public void store(Page page); 
    } 

然後基於此開發了 MySQL 的儲存實現類、HBase 的儲存實現類還有控制檯的輸出實現類,如 MySQL 的儲存實現類,其實就是簡單的資料插入語句:

    /** 
     * 使用dbc資料庫連線池將資料寫入mysql表中 
     */ 
    public class MySQLStoreImpl implements IStore { 
        private QueryRunner queryRunner = new QueryRunner(DBCPUtil.getDataSource()); 
     
        @Override 
        public void store(Page page) { 
            String sql = "insert into phone(id, source, brand, title, price, comment_count, url, img_url, params) values(?, ?, ?, ?, ?, ?, ?, ?, ?)"; 
            try { 
                queryRunner.update(sql, page.getId(), 
                        page.getSource(), 
                        page.getBrand(), 
                        page.getTitle(), 
                        page.getPrice(), 
                        page.getCommentCount(), 
                        page.getUrl(), 
                        page.getImgUrl(), 
                        page.getParams()); 
            } catch (SQLException e) { 
                e.printStackTrace(); 
            } 
        } 
    } 

而 HBase 的儲存實現類,則是 HBase Java API 的常用插入語句程式碼:

    ...... 
    // cf1:price 
    Put pricePut = new Put(rowKey); 
    // 必須要做是否為null判斷,否則會有空指標異常 
    pricePut.addColumn(cf1, "price".getBytes(), page.getPrice() != null ? String.valueOf(page.getPrice()).getBytes() : "".getBytes()); 
    puts.add(pricePut); 
    // cf1:comment 
    Put commentPut = new Put(rowKey); 
    commentPut.addColumn(cf1, "comment".getBytes(), page.getCommentCount() != null ? String.valueOf(page.getCommentCount()).getBytes() : "".getBytes()); 
    puts.add(commentPut); 
    // cf1:brand 
    Put brandPut = new Put(rowKey); 
    brandPut.addColumn(cf1, "brand".getBytes(), page.getBrand() != null ? page.getBrand().getBytes() : "".getBytes()); 
    puts.add(brandPut); 
    ...... 

當然,至於要將資料儲存在哪個地方,在初始化爬蟲程式時,是可以手動選擇的:

    // 3.注入儲存器 
    iSpider.setStore(new HBaseStoreImpl()); 

目前還沒有把程式碼寫成可以同時儲存在多個地方,按照目前程式碼的架構,要實現這一點也比較簡單,修改一下相應程式碼就好了。

實際上,是可以先把資料儲存到 MySQL 中,然後通過 Sqoop 匯入到 HBase 中。

仍然需要注意的是,如果確定需要將資料儲存到 HBase 中,請保證你有可用的叢集環境,並且需要將如下配置文件新增到 classpath 下:

  1. core-site.xml 
  2. hbase-site.xml 
  3. hdfs-site.xml 

對大資料感興趣的同學可以折騰一下這一點,如果之前沒有接觸過的,直接使用 MySQL 儲存就好了,只需要在初始化爬蟲程式時注入 MySQL 儲存器即可:

    // 3.注入儲存器 
    iSpider.setStore(new MySQLStoreImpl()); 

URL 排程系統

URL 排程系統是實現整個爬蟲系統分散式的橋樑與關鍵,正是通過 URL 排程系統的使用,才使得整個爬蟲系統可以較為高效(Redis 作為儲存)隨機地獲取 URL,並實現整個系統的分散式。

URL 倉庫

通過架構圖可以看出,所謂的 URL 倉庫不過是 Redis 倉庫,即在我們的系統中使用 Redis 來儲存 URL 地址列表。

正是這樣,才能保證我們的程式實現分散式,只要儲存了 URL 是唯一的,這樣不管我們的爬蟲程式有多少個,最終儲存下來的資料都是隻有唯一一份的,而不會重複。

同時 URL 倉庫中的 URL 地址在獲取時的策略是通過佇列的方式來實現的,待會通過 URL 排程器的實現即可知道。

另外,在我們的 URL 倉庫中,主要儲存了下面的資料:

種子 URL 列表,Redis 的資料型別為 list

種子 URL 是持久化儲存的,一定時間後,由 URL 定時器通過種子 URL 獲取 URL,並將其注入到我們的爬蟲程式需要使用的高優先順序 URL 佇列中。

這樣就可以保證我們的爬蟲程式可以源源不斷地爬取資料而不需要中止程式的執行。

高優先順序 URL 佇列,Redis 的資料型別為 set

什麼是高優先順序 URL 佇列?其實它就是用來儲存列表 URL 的。那麼什麼是列表 URL 呢?

說白了就是一個列表中含有多個商品,以京東為例,我們開啟一個手機列表:

該地址中包含的不是一個具體商品的 URL,而是包含了多個我們需要爬取的資料(手機商品)的列表。

通過對每個高階 URL 的解析,我們可以獲取到非常多的具體商品 URL,而具體的商品 URL,就是低優先 URL,其會儲存到低優

先級 URL 佇列中。

那麼以這個系統為例,儲存的資料類似如下:

    jd.com.higher 
        --https://list.jd.com/list.html?cat=9987,653,655&page=1 
        ...  
    suning.com.higher 
        --https://list.suning.com/0-20006-0.html 
        ... 

低優先順序 URL 佇列,Redis 的資料型別為 set

低優先順序 URL 其實就是具體某個商品的 URL,如下面一個手機商品:

通過下載該 URL 的資料,並對其進行解析,就能夠獲取到我們想要的資料。

那麼以這個系統為例,儲存的資料類似如下:

    jd.com.lower 
        --https://item.jd.com/23545806622.html 
        ... 
    suning.com.lower 
        --https://product.suning.com/0000000000/690128156.html 
        ... 

URL 排程器

所謂 URL 排程器,就是 URL 倉庫 Java 程式碼的排程策略,不過因為其核心在於排程,所以將其放到 URL 排程器中來進行說明,目前其排程基於以下介面開發:

    /** 
     * url 倉庫 
     * 主要功能: 
     *      向倉庫中新增url(高優先順序的列表,低優先順序的商品url) 
     *      從倉庫中獲取url(優先獲取高優先順序的url,如果沒有,再獲取低優先順序的url) 
     * 
     */ 
    public interface IRepository { 
     
        /** 
         * 獲取url的方法 
         * 從倉庫中獲取url(優先獲取高優先順序的url,如果沒有,再獲取低優先順序的url) 
         * @return 
         */ 
        public String poll(); 
     
        /** 
         * 向高優先順序列表中新增商品列表url 
         * @param highUrl 
         */ 
        public void offerHigher(String highUrl); 
     
        /** 
         * 向低優先順序列表中新增商品url 
         * @param lowUrl 
         */ 
        public void offerLower(String lowUrl); 
     
    } 

其基於 Redis 作為 URL 倉庫的實現如下:

    /** 
     * 基於Redis的全網爬蟲,隨機獲取爬蟲url: 
     * 
     * Redis中用來儲存url的資料結構如下: 
     * 1.需要爬取的域名集合(儲存資料型別為set,這個需要先在Redis中新增) 
     *      key 
     *          spider.website.domains 
     *      value(set) 
     *          jd.com  suning.com  gome.com 
     *      key由常量物件SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY 獲得 
     * 2.各個域名所對應的高低優先url佇列(儲存資料型別為list,這個由爬蟲程式解析種子url後動態新增) 
     *      key 
     *          jd.com.higher 
     *          jd.com.lower 
     *          suning.com.higher 
     *          suning.com.lower 
     *          gome.com.higher 
     *          gome.come.lower 
     *      value(list) 
     *          相對應需要解析的url列表 
     *      key由隨機的域名 + 常量 SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX或者SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX獲得 
     * 3.種子url列表 
     *      key 
     *          spider.seed.urls 
     *      value(list) 
     *          需要爬取的資料的種子url 
     *       key由常量SpiderConstants.SPIDER_SEED_URLS_KEY獲得 
     * 
     *       種子url列表中的url會由url排程器定時向高低優先url佇列中 
     */ 
    public class RandomRedisRepositoryImpl implements IRepository { 
     
        /** 
         * 構造方法 
         */ 
        public RandomRedisRepositoryImpl() { 
            init(); 
        } 
     
        /** 
         * 初始化方法,初始化時,先將redis中存在的高低優先順序url佇列全部刪除 
         * 否則上一次url佇列中的url沒有消耗完時,再停止啟動跑下一次,就會導致url倉庫中有重複的url 
         */ 
        public void init() { 
            Jedis jedis = JedisUtil.getJedis(); 
            Set<String> domains = jedis.smembers(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); 
            String higherUrlKey; 
            String lowerUrlKey; 
            for(String domain : domains) { 
                higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; 
                lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; 
                jedis.del(higherUrlKey, lowerUrlKey); 
            } 
            JedisUtil.returnJedis(jedis); 
        } 
     
        /** 
         * 從佇列中獲取url,目前的策略是: 
         *      1.先從高優先順序url佇列中獲取 
         *      2.再從低優先順序url佇列中獲取 
         *  對應我們的實際場景,應該是先解析完列表url再解析商品url 
         *  但是需要注意的是,在分散式多執行緒的環境下,肯定是不能完全保證的,因為在某個時刻高優先順序url佇列中 
         *  的url消耗完了,但實際上程式還在解析下一個高優先順序url,此時,其它執行緒去獲取高優先順序佇列url肯定獲取不到 
         *  這時就會去獲取低優先順序佇列中的url,在實際考慮分析時,這點尤其需要注意 
         * @return 
         */ 
        @Override 
        public String poll() { 
            // 從set中隨機獲取一個頂級域名 
            Jedis jedis = JedisUtil.getJedis(); 
            String randomDomain = jedis.srandmember(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY);    // jd.com 
            String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX;                // jd.com.higher 
            String url = jedis.lpop(key); 
            if(url == null) {   // 如果為null,則從低優先順序中獲取 
                key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX;    // jd.com.lower 
                url = jedis.lpop(key); 
            } 
            JedisUtil.returnJedis(jedis); 
            return url; 
        } 
     
        /** 
         * 向高優先順序url佇列中新增url 
         * @param highUrl 
         */ 
        @Override 
        public void offerHigher(String highUrl) { 
            offerUrl(highUrl, SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX); 
        } 
     
        /** 
         * 向低優先url佇列中新增url 
         * @param lowUrl 
         */ 
        @Override 
        public void offerLower(String lowUrl) { 
            offerUrl(lowUrl, SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX); 
        } 
     
        /** 
         * 新增url的通用方法,通過offerHigher和offerLower抽象而來 
         * @param url   需要新增的url 
         * @param urlTypeSuffix  url型別字尾.higher或.lower 
         */ 
        public void offerUrl(String url, String urlTypeSuffix) { 
            Jedis jedis = JedisUtil.getJedis(); 
            String domain = SpiderUtil.getTopDomain(url);   // 獲取url對應的頂級域名,如jd.com 
            String key = domain + urlTypeSuffix;            // 拼接url佇列的key,如jd.com.higher 
            jedis.lpush(key, url);                          // 向url佇列中新增url 
            JedisUtil.returnJedis(jedis); 
        } 
    } 

通過程式碼分析也可以知道,其核心就在如何排程 URL 倉庫(Redis)中的 URL。

URL 定時器

一段時間後,高優先順序 URL 佇列和低優先 URL 佇列中的 URL 都會被消費完。

為了讓程式可以繼續爬取資料,同時減少人為的干預,可以預先在 Redis 中插入種子 URL,之後定時讓 URL 定時器從種子 URL 中取出 URL 存放到高優先順序 URL 佇列中,以此達到程式定時不間斷爬取資料的目的。

URL 消費完畢後,是否需要迴圈不斷爬取資料根據個人業務需求而不同,因此這一步不是必需的,只是也提供了這樣的操作。

因為事實上,我們需要爬取的資料也是每隔一段時間就會更新的,如果希望我們爬取的資料也跟著定時更新,那麼這時定時器就有非常重要的作用了。

不過需要注意的是,一旦決定需要迴圈重複爬取資料,則在設計儲存器實現時需要考慮重複資料的問題,即重複資料應該是更新操作。

目前在我設計的儲存器不包括這個功能,有興趣的朋友可以自己實現,只需要在插入資料前判斷資料庫中是否存在該資料即可。

另外需要注意的一點是,URL 定時器是一個獨立的程序,需要單獨啟動。

定時器基於 Quartz 實現,下面是其 job 的程式碼:

    /** 
     * 每天定時從url倉庫中獲取種子url,新增進高優先順序列表 
     */ 
    public class UrlJob implements Job { 
     
        // log4j日誌記錄 
        private Logger logger = LoggerFactory.getLogger(UrlJob.class); 
     
        @Override 
        public void execute(JobExecutionContext context) throws JobExecutionException { 
            /** 
             * 1.從指定url種子倉庫獲取種子url 
             * 2.將種子url新增進高優先順序列表 
             */ 
            Jedis jedis = JedisUtil.getJedis(); 
            Set<String> seedUrls = jedis.smembers(SpiderConstants.SPIDER_SEED_URLS_KEY);  // spider.seed.urls Redis資料型別為set,防止重複新增種子url 
            for(String seedUrl : seedUrls) { 
                String domain = SpiderUtil.getTopDomain(seedUrl);   // 種子url的頂級域名 
                jedis.sadd(domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX, seedUrl); 
                logger.info("獲取種子:{}", seedUrl); 
            } 
            JedisUtil.returnJedis(jedis); 
    //        System.out.println("Scheduler Job Test..."); 
        } 
     
    } 

 排程器實現如下:

    /** 
     * url定時排程器,定時向url對應倉庫中存放種子url 
     * 
     * 業務規定:每天凌晨1點10分向倉庫中存放種子url 
     */ 
    public class UrlJobScheduler { 
     
        public UrlJobScheduler() { 
            init(); 
        } 
     
        /** 
         * 初始化排程器 
         */ 
        public void init() { 
            try { 
                Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); 
     
                // 如果沒有以下start方法的執行,則是不會開啟任務的排程 
                scheduler.start(); 
     
                String name = "URL_SCHEDULER_JOB"; 
                String group = "URL_SCHEDULER_JOB_GROUP"; 
                JobDetail jobDetail = new JobDetail(name, group, UrlJob.class); 
                String cronExpression = "0 10 1 * * ?"; 
                Trigger trigger = new CronTrigger(name, group, cronExpression); 
     
                // 排程任務 
                scheduler.scheduleJob(jobDetail, trigger); 
     
            } catch (SchedulerException e) { 
                e.printStackTrace(); 
            } catch (ParseException e) { 
                e.printStackTrace(); 
            } 
        } 
     
        public static void main(String[] args) { 
            UrlJobScheduler urlJobScheduler = new UrlJobScheduler(); 
            urlJobScheduler.start(); 
        } 
     
        /** 
         * 定時排程任務 
         * 因為我們每天要定時從指定的倉庫中獲取種子url,並存放到高優先順序的url列表中 
         * 所以是一個不間斷的程式,所以不能停止 
         */ 
        private void start() { 
            while (true) { 
     
            } 
        } 
    } 

監控系統

監控報警系統的加入主要是為了讓使用者可以主動發現節點宕機,而不是被動地發現,因為實際中爬蟲程式可能是持續不斷執行的。

並且我們會在多個節點上部署我們的爬蟲程式,因此很有必要對節點進行監控,並且在節點出現問題時可以及時發現並修正,需要注意的是,監控報警系統是一個獨立的程序,需要單獨啟動。

基本原理

首先需要先在 ZooKeeper 中建立一個 /ispider 節點:

    [zk: localhost:2181(CONNECTED) 1] create /ispider ispider 
    Created /ispider 

 監控報警系統的開發主要依賴於 ZooKeeper 實現,監控程式對 ZooKeeper  下面的這個節點目錄進行監聽:

    [zk: localhost:2181(CONNECTED) 0] ls /ispider  
    [] 

爬蟲程式啟動時會在該節點目錄下注冊一個臨時節點目錄:

    [zk: localhost:2181(CONNECTED) 0] ls /ispider  
    [192.168.43.166] 

當節點出現宕機時,該臨時節點目錄就會被 ZooKeeper 刪除。

    [zk: localhost:2181(CONNECTED) 0] ls /ispider 
    [] 

同時因為我們監聽了節點目錄 /ispider,所以當 ZooKeeper 刪除其下的節點目錄時(或增加一個節點目錄),ZooKeeper 會給我們的監控程式傳送通知。

即我們的監控程式會得到回撥,這樣便可以在回撥程式中執行報警的系統動作,從而完成監控報警的功能。

ZooKeeper Java API 使用說明

可以使用 ZooKeeper 原生的 Java API,我在另外寫的一個 RPC 框架(底層基於 Netty 實現遠端通訊)中就是使用原生的 API。

不過顯然程式碼會複雜很多,並且本身需要對 ZooKeeper 有更多的學習和了解,這樣用起來才會容易一些。

所以為了降低開發的難度,這裡使用第三方封裝的 API,即 curator,來進行 ZooKeeper 客戶端程式的開發。

爬蟲系統 ZooKeeper 註冊

在啟動爬蟲系統時,我們的程式都會啟動一個 ZooKeeper 客戶端來向 ZooKeeper 來註冊自身的節點資訊,主要是 IP 地址。

並在 /ispider 節點目錄建立一個以該爬蟲程式所在的節點 IP 地址命名的節點,如 /ispider/192.168.43.116,實現的程式碼如下:

    /** 
     * 註冊zk 
     */ 
    private void registerZK() { 
        String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181"; 
        int baseSleepTimeMs = 1000; 
        int maxRetries = 3; 
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); 
        CuratorFramework curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy); 
        curator.start(); 
        String ip = null; 
        try { 
            // 向zk的具體目錄註冊 寫節點 建立節點 
            ip = InetAddress.getLocalHost().getHostAddress(); 
            curator.create().withMode(CreateMode.EPHEMERAL).forPath("/ispider/" + ip, ip.getBytes()); 
        } catch (UnknownHostException e) { 
            e.printStackTrace(); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
    } 

應該注意到的是,我們建立的節點為臨時節點,要想實現監控報警功能,必須要為臨時節點。

監控程式

首先需要先監聽 ZooKeeper 中的一個節點目錄,在我們的系統中,設計是監聽 /ispider 這個節點目錄:

    public SpiderMonitorTask() { 
        String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181"; 
        int baseSleepTimeMs = 1000; 
        int maxRetries = 3; 
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); 
        curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy); 
        curator.start(); 
        try { 
            previousNodes = curator.getChildren().usingWatcher(this).forPath("/ispider"); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
    } 

在上面註冊了 ZooKeeper 中的 watcher,也就是接收通知的回撥程式,在該程式中,執行我們報警的邏輯:

    /** 
     * 這個方法,當監控的zk對應的目錄一旦有變動,就會被呼叫 
     * 得到當前最新的節點狀態,將最新的節點狀態和初始或者上一次的節點狀態作比較,那我們就知道了是由誰引起的節點變化 
     * @param event 
     */ 
    @Override 
    public void process(WatchedEvent event) { 
        try { 
            List<String> currentNodes = curator.getChildren().usingWatcher(this).forPath("/ispider"); 
            //            HashSet<String> previousNodesSet = new HashSet<>(previousNodes); 
            if(currentNodes.size() > previousNodes.size()) { // 最新的節點服務,超過之前的節點服務個數,有新的節點增加進來 
                for(String node : currentNodes) { 
                    if(!previousNodes.contains(node)) { 
                        // 當前節點就是新增節點 
                        logger.info("----有新的爬蟲節點{}新增進來", node); 
                    } 
                } 
            } else if(currentNodes.size() < previousNodes.size()) {  // 有節點掛了    傳送告警郵件或者簡訊 
                for(String node : previousNodes) { 
                    if(!currentNodes.contains(node)) { 
                        // 當前節點掛掉了 得需要發郵件 
                        logger.info("----有爬蟲節點{}掛掉了", node); 
                        MailUtil.sendMail("有爬蟲節點掛掉了,請人工檢視爬蟲節點的情況,節點資訊為:", node); 
                    } 
                } 
            } // 掛掉和新增的數目一模一樣,上面是不包括這種情況的,有興趣的朋友可以直接實現包括這種特殊情況的監控 
            previousNodes = currentNodes;   // 更新上一次的節點列表,成為最新的節點列表 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
        // 在原生的API需要再做一次監控,因為每一次監控只會生效一次,所以當上面發現變化後,需要再監聽一次,這樣下一次才能監聽到 
        // 但是在使用curator的API時則不需要這樣做 
    } 

當然,判斷節點是否掛掉,上面的邏輯還是存在一定的問題的,按照上面的邏輯,假如某一時刻新增節點和刪除節點事件同時發生,那麼其就不能判斷出來,所以如果需要更精準的話,可以將上面的程式程式碼修改一下。

郵件傳送模組

使用模板程式碼就可以了,不過需要注意的是,在使用時,發件人的資訊請使用自己的郵箱。

下面是爬蟲節點掛掉時接收到的郵件:

實際上,如果購買了簡訊服務,那麼通過簡訊 API 也可以向我們的手機發送簡訊。

實戰:爬取京東、蘇寧易購全網手機商品資料

因為前面在介紹這個系統的時候也提到了,我只寫了京東和蘇寧易購的網頁解析器,所以接下來也就是爬取其全網的手機商品資料。

環境說明

需要確保 Redis、ZooKeeper 服務可用,另外如果需要使用 HBase 來儲存資料,需要確保 Hadoop 叢集中的 HBase 可用,並且相關配置檔案已經加入到爬蟲程式的 classpath 中。

還有一點需要注意的是,URL 定時器和監控報警系統是作為單獨的程序來執行的,並且也是可選的。

爬蟲結果

進行了兩次爬取,分別嘗試將資料儲存到 MySQL 和 HBase 中,給出如下資料情況。

儲存到 MySQL

    mysql> select count(*) from phone; 
    +----------+ 
    | count(*) | 
    +----------+ 
    |    12052 | 
    +----------+ 
    1 row in set 
     
    mysql> select count(*) from phone where source='jd.com'; 
    +----------+ 
    | count(*) | 
    +----------+ 
    |     9578 | 
    +----------+ 
    1 row in set 
     
    mysql> select count(*) from phone where source='suning 
    .com'; 
    +----------+ 
    | count(*) | 
    +----------+ 
    |     2474 | 
    +----------+ 
    1 row in set 

在視覺化工具中檢視資料情況:

儲存到 HBase

    hbase(main):225:0* count 'phone' 
    Current count: 1000, row: 11155386088_jd.com 
    Current count: 2000, row: 136191393_suning.com 
    Current count: 3000, row: 16893837301_jd.com 
    Current count: 4000, row: 19036619855_jd.com 
    Current count: 5000, row: 1983786945_jd.com 
    Current count: 6000, row: 1997392141_jd.com 
    Current count: 7000, row: 21798495372_jd.com 
    Current count: 8000, row: 24154264902_jd.com 
    Current count: 9000, row: 25687565618_jd.com 
    Current count: 10000, row: 26458674797_jd.com 
    Current count: 11000, row: 617169906_suning.com 
    Current count: 12000, row: 769705049_suning.com                  
    12348 row(s) in 1.5720 seconds 
    => 12348 

在 HDFS 中檢視資料情況:

資料量與實際情況分析

京東:京東手機的列表大概有 160 多頁,每個列表有 60 個商品資料,所以總量在 9600 左右,我們的資料基本是符合的。

後面通過日誌分析可以知道,一般丟失的資料為連線超時導致的,所以在選取爬蟲的環境時,更建議在網路環境好的主機上進行。

同時如果可以有 IP 代理地址庫就更好了,另外對於連線超時的情況,可以進一步在我們的程式中加以控制。

一旦出現爬取資料失敗的 URL,可以將其加入到重試 URL 佇列中,目前這一點功能我是沒有做,有興趣的同學可以試一下。

蘇寧易購:再來看看蘇寧的資料,其有 100 頁左右的手機列表,每頁也是 60 個商品資料,所以總量在 6000 左右。

但可以看到,我們的資料卻只有 3000 這樣的數量級(缺少的依然是頻繁爬取造成的連線失敗問題),這是為什麼呢?

這是因為,開啟蘇寧的某個列表頁面後,其是先載入 30 個商品,當滑鼠向下滑動時,才會通過另外的 API 去載入其他的 30 個商品資料,每一個列表頁面都是如此,所以,實際上,我們是缺少了一半的商品資料沒有爬取。

知道這個原因之後,實現也不難,但是因為時間關係,我就沒有做了,有興趣的朋友折騰一下吧。

通過日誌分析爬蟲系統的效能

在我們的爬蟲系統中,每個關鍵的地方,如網頁下載、資料解析等都是有打 logger 的,所以通過日誌,可以大概分析出相關的時間引數。

  1. 2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網頁:https://list.jd.com/list.html?cat=9987,653,655&page=1,消耗時長:590 ms,代理資訊:null:null 
  2. 2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - 解析列表頁面:https://list.jd.com/list.html?cat=9987,653,655&page=1, 消耗時長:46ms 
  3. 2018-04-01 21:26:03 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析列表頁面:https://list.suning.com/0-20006-0.html, 消耗時長:49ms 
  4. 2018-04-01 21:26:04 [pool-1-thread-5] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網頁:https://item.jd.com/6737464.html,消耗時長:219 ms,代理資訊:null:null 
  5. 2018-04-01 21:26:04 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網頁:https://list.jd.com/list.html?cat=9987,653,655&page=2&sort=sort_rank_asc&trans=1&JL=6_0_0,消耗時長:276 ms,代理資訊:null:null 
  6. 2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網頁:https://list.suning.com/0-20006-99.html,消耗時長:300 ms,代理資訊:null:null 
  7. 2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析列表頁面:https://list.suning.com/0-20006-99.html, 消耗時長:4ms 
  8. ...... 
  9. 2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網頁:https://club.jd.com/comment/productCommentSummaries.action?referenceIds=23934388891,消耗時長:176 ms,代理資訊:null:null 
  10. 2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - 解析商品頁面:https://item.jd.com/23934388891.html, 消耗時長:413ms 
  11. 2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下載網頁:https://review.suning.com/ajax/review_satisfy/general-00000000010017793337-0070079092-----satisfy.htm,消耗時長:308 ms,代理資訊:null:null 
  12. 2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析商品頁面:https://product.suning.com/0070079092/10017793337.html, 消耗時長:588ms 
  13. ...... 

平均下來,下載一個商品網頁資料的時間在 200~500 毫秒不等,當然這個還需要取決於當時的網路情況。

另外,如果想要真正計算爬取一個商品的時間資料,可以通過日誌下面的資料來計算:

  • 下載一個商品頁面資料的時間
  • 獲取價格資料的時間
  • 獲取評論資料的時間

在我的主機上(CPU:E5 10核心,記憶體:32GB,分別開啟 1 個虛擬機器和 3 個虛擬機器),情況如下:

可以看到,當使用 3 個節點時,時間並不會相應地縮小為原來的 1/3,這是因為此時影響爬蟲效能的問題主要是網路問題,節點數量多,執行緒數量大,網路請求也多。

但是頻寬一定,並且在沒有使用代理的情況,請求頻繁,連線失敗的情況也會增多,對時間也有一定的影響,如果使用隨機代理庫,情況將會好很多。

但可以肯定的是,在橫向擴充套件增加爬蟲節點之後,確實可以大大縮小我們的爬蟲時間,這也是分散式爬蟲系統的好處。

爬蟲系統中使用的反反爬蟲策略

在整個爬蟲系統的設計中,主要使用下面的策略來達到反反爬蟲的目的:

  • 使用代理來訪問-->IP 代理庫,隨機 IP 代理。
  • 隨機頂級域名url 訪問-->url 排程系統。
  • 每個執行緒每爬取完一條商品資料 sleep 一小段時間再進行爬取。

總結

需要說明的是,本系統是基於 Java 實現的,但個人覺得,語言本身依然不是問題,核心在於對整個系統的設計以及理解上。